From 8d03124ba36b247f0d7f8582469e4068d126db79 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Mon, 4 May 2020 17:11:14 +0100 Subject: [PATCH 1/6] [ML] More advanced model snapshot retention options This PR implements the following changes to make ML model snapshot retention more flexible in advance of adding a UI for the feature in an upcoming release. - The default for `model_snapshot_retention_days` for new jobs is now 10 instead of 1 - There is a new job setting, `daily_model_snapshot_retention_after_days`, that defaults to 1 for new jobs and `model_snapshot_retention_days` for pre-7.8 jobs - For days that are older than `model_snapshot_retention_days`, all model snapshots are deleted as before - For days that are in between `daily_model_snapshot_retention_after_days` and `model_snapshot_retention_days` all but the first model snapshot for that day are deleted - The `retain` setting of model snapshots is still respected to allow selected model snapshots to be retained indefinitely Closes #52150 --- .../client/ml/job/config/JobTests.java | 10 +- .../anomaly-detection/apis/get-job.asciidoc | 3 +- .../apis/get-ml-info.asciidoc | 3 +- .../anomaly-detection/apis/put-job.asciidoc | 7 +- .../apis/update-job.asciidoc | 4 + docs/reference/ml/ml-shared.asciidoc | 13 + .../xpack/core/ml/job/config/Job.java | 43 +++- .../xpack/core/ml/job/messages/Messages.java | 4 + .../xpack/core/ml/job/config/JobTests.java | 44 +++- .../core/ml/job/config/JobUpdateTests.java | 37 ++- .../ml/integration/DeleteExpiredDataIT.java | 23 +- .../integration/ModelSnapshotRetentionIT.java | 234 ++++++++++++++++++ .../ml/action/TransportMlInfoAction.java | 5 +- .../TransportRevertModelSnapshotAction.java | 5 +- .../xpack/ml/job/JobManager.java | 8 +- .../ml/job/persistence/JobConfigProvider.java | 1 - .../AbstractExpiredJobDataRemover.java | 15 +- .../ExpiredModelSnapshotsRemover.java | 53 ++-- .../job/retention/ExpiredResultsRemover.java | 9 +- .../AbstractExpiredJobDataRemoverTests.java | 10 +- .../ExpiredModelSnapshotsRemoverTests.java | 41 +-- .../retention/ExpiredResultsRemoverTests.java | 5 +- .../rest-api-spec/test/ml/jobs_crud.yml | 58 +++++ .../rest-api-spec/test/ml/ml_info.yml | 15 +- 24 files changed, 555 insertions(+), 95 deletions(-) create mode 100644 x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ModelSnapshotRetentionIT.java diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/config/JobTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/config/JobTests.java index 64b56c65e19a2..dcd7d4005d891 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/config/JobTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/config/JobTests.java @@ -141,11 +141,17 @@ public static Job.Builder createRandomizedJobBuilder() { if (randomBoolean()) { builder.setBackgroundPersistInterval(TimeValue.timeValueHours(randomIntBetween(1, 24))); } + Long modelSnapshotRetentionDays = null; if (randomBoolean()) { - builder.setModelSnapshotRetentionDays(randomNonNegativeLong()); + modelSnapshotRetentionDays = randomNonNegativeLong(); + builder.setModelSnapshotRetentionDays(modelSnapshotRetentionDays); } if (randomBoolean()) { - builder.setDailyModelSnapshotRetentionAfterDays(randomNonNegativeLong()); + if (modelSnapshotRetentionDays != null) { + builder.setDailyModelSnapshotRetentionAfterDays(randomLongBetween(0, modelSnapshotRetentionDays)); + } else { + builder.setDailyModelSnapshotRetentionAfterDays(randomNonNegativeLong()); + } } if (randomBoolean()) { builder.setResultsRetentionDays(randomNonNegativeLong()); diff --git a/docs/reference/ml/anomaly-detection/apis/get-job.asciidoc b/docs/reference/ml/anomaly-detection/apis/get-job.asciidoc index 0eab920cff7ea..d90b4c7164dea 100644 --- a/docs/reference/ml/anomaly-detection/apis/get-job.asciidoc +++ b/docs/reference/ml/anomaly-detection/apis/get-job.asciidoc @@ -137,7 +137,8 @@ The API returns the following results: "model_plot_config" : { "enabled" : true }, - "model_snapshot_retention_days" : 1, + "model_snapshot_retention_days" : 10, + "daily_model_snapshot_retention_after_days" : 1, "custom_settings" : { "created_by" : "ml-module-sample", ... diff --git a/docs/reference/ml/anomaly-detection/apis/get-ml-info.asciidoc b/docs/reference/ml/anomaly-detection/apis/get-ml-info.asciidoc index 7d5133b6a9d43..f4dd43312b1d9 100644 --- a/docs/reference/ml/anomaly-detection/apis/get-ml-info.asciidoc +++ b/docs/reference/ml/anomaly-detection/apis/get-ml-info.asciidoc @@ -102,7 +102,8 @@ This is a possible response: }, "model_memory_limit" : "1gb", "categorization_examples_limit" : 4, - "model_snapshot_retention_days" : 1 + "model_snapshot_retention_days" : 10, + "daily_model_snapshot_retention_after_days" : 1 }, "datafeeds" : { "scroll_size" : 1000 diff --git a/docs/reference/ml/anomaly-detection/apis/put-job.asciidoc b/docs/reference/ml/anomaly-detection/apis/put-job.asciidoc index 8e1033643dcd9..9b0b15b154321 100644 --- a/docs/reference/ml/anomaly-detection/apis/put-job.asciidoc +++ b/docs/reference/ml/anomaly-detection/apis/put-job.asciidoc @@ -224,6 +224,10 @@ include::{docdir}/ml/ml-shared.asciidoc[tag=custom-settings] include::{docdir}/ml/ml-shared.asciidoc[tag=data-description] //End data_description +`daily_model_snapshot_retention_after_days`:: +(Optional, long) +include::{docdir}/ml/ml-shared.asciidoc[tag=daily-model-snapshot-retention-after-days] + `description`:: (Optional, string) A description of the job. @@ -320,7 +324,8 @@ When the job is created, you receive the following results: "time_field" : "timestamp", "time_format" : "epoch_ms" }, - "model_snapshot_retention_days" : 1, + "model_snapshot_retention_days" : 10, + "daily_model_snapshot_retention_after_days" : 1, "results_index_name" : "shared", "allow_lazy_open" : false } diff --git a/docs/reference/ml/anomaly-detection/apis/update-job.asciidoc b/docs/reference/ml/anomaly-detection/apis/update-job.asciidoc index 492cff08cb6fb..76610264cb347 100644 --- a/docs/reference/ml/anomaly-detection/apis/update-job.asciidoc +++ b/docs/reference/ml/anomaly-detection/apis/update-job.asciidoc @@ -82,6 +82,10 @@ close the job, then reopen the job and restart the {dfeed} for the changes to ta (object) include::{docdir}/ml/ml-shared.asciidoc[tag=custom-settings] +`daily_model_snapshot_retention_after_days`:: +(long) +include::{docdir}/ml/ml-shared.asciidoc[tag=daily-model-snapshot-retention-after-days] + `description`:: (string) A description of the job. diff --git a/docs/reference/ml/ml-shared.asciidoc b/docs/reference/ml/ml-shared.asciidoc index d1941e4cabe74..faa46d5e5c1a1 100644 --- a/docs/reference/ml/ml-shared.asciidoc +++ b/docs/reference/ml/ml-shared.asciidoc @@ -361,6 +361,19 @@ example, it can contain custom URL information as shown in {ml-docs}/ml-configuring-url.html[Adding custom URLs to {ml} results]. end::custom-settings[] +tag::daily-model-snapshot-retention-after-days[] +Advanced configuration option. A number of days in between 0 and the value of +`model_snapshot_retention_days` after which only one model snapshot per day +is retained instead of all model snapshots. Age is calculated relative to the +timestamp of the newest model snapshot. The default value is `1` for newly +created jobs, which means that all snapshots are retained for one day but +older snapshots are thinned out such that only one per day is retained until +`model_snapshot_retention_days`. For jobs that were created before this +setting was available, the default is the same as that job's +`model_snapshot_retention_days` setting, to preserve the original behavior +that no thinning out of model snapshots is done. +end::daily-model-snapshot-retention-after-days[] + tag::data-description[] The data description defines the format of the input data when you send data to the job by using the <> API. Note that when configure diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java index a96c1c63f53c4..fdeebc840858a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java @@ -98,7 +98,8 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO */ public static final ByteSizeValue PROCESS_MEMORY_OVERHEAD = new ByteSizeValue(10, ByteSizeUnit.MB); - public static final long DEFAULT_MODEL_SNAPSHOT_RETENTION_DAYS = 1; + public static final long DEFAULT_MODEL_SNAPSHOT_RETENTION_DAYS = 10; + public static final long DEFAULT_DAILY_MODEL_SNAPSHOT_RETENTION_AFTER_DAYS = 1; private static ObjectParser createParser(boolean ignoreUnknownFields) { ObjectParser parser = new ObjectParser<>("job_details", ignoreUnknownFields, Builder::new); @@ -808,6 +809,10 @@ public Builder setModelSnapshotRetentionDays(Long modelSnapshotRetentionDays) { return this; } + public Long getModelSnapshotRetentionDays() { + return modelSnapshotRetentionDays; + } + public Builder setDailyModelSnapshotRetentionAfterDays(Long dailyModelSnapshotRetentionAfterDays) { this.dailyModelSnapshotRetentionAfterDays = dailyModelSnapshotRetentionAfterDays; return this; @@ -1043,9 +1048,6 @@ public void validateInputFields() { checkValidBackgroundPersistInterval(); checkValueNotLessThan(0, RENORMALIZATION_WINDOW_DAYS.getPreferredName(), renormalizationWindowDays); - checkValueNotLessThan(0, MODEL_SNAPSHOT_RETENTION_DAYS.getPreferredName(), modelSnapshotRetentionDays); - checkValueNotLessThan(0, DAILY_MODEL_SNAPSHOT_RETENTION_AFTER_DAYS.getPreferredName(), - dailyModelSnapshotRetentionAfterDays); checkValueNotLessThan(0, RESULTS_RETENTION_DAYS.getPreferredName(), resultsRetentionDays); if (!MlStrings.isValidId(id)) { @@ -1055,6 +1057,8 @@ public void validateInputFields() { throw new IllegalArgumentException(Messages.getMessage(Messages.JOB_CONFIG_ID_TOO_LONG, MlStrings.ID_LENGTH_LIMIT)); } + validateModelSnapshotRetentionSettings(); + validateGroups(); // Results index name not specified in user input means use the default, so is acceptable in this validation @@ -1076,6 +1080,37 @@ public void validateAnalysisLimitsAndSetDefaults(@Nullable ByteSizeValue maxMode AnalysisLimits.DEFAULT_MODEL_MEMORY_LIMIT_MB); } + /** + * This is meant to be called when a new job is created. + * It sets {@link #dailyModelSnapshotRetentionAfterDays} to the default value if it is not set and the default makes sense. + */ + public void validateModelSnapshotRetentionSettingsAndSetDefaults() { + validateModelSnapshotRetentionSettings(); + if (dailyModelSnapshotRetentionAfterDays == null && + modelSnapshotRetentionDays != null && + modelSnapshotRetentionDays > DEFAULT_DAILY_MODEL_SNAPSHOT_RETENTION_AFTER_DAYS) { + dailyModelSnapshotRetentionAfterDays = DEFAULT_DAILY_MODEL_SNAPSHOT_RETENTION_AFTER_DAYS; + } + } + + /** + * Validates that {@link #modelSnapshotRetentionDays} and {@link #dailyModelSnapshotRetentionAfterDays} make sense, + * both individually and in combination. + */ + public void validateModelSnapshotRetentionSettings() { + + checkValueNotLessThan(0, MODEL_SNAPSHOT_RETENTION_DAYS.getPreferredName(), modelSnapshotRetentionDays); + checkValueNotLessThan(0, DAILY_MODEL_SNAPSHOT_RETENTION_AFTER_DAYS.getPreferredName(), + dailyModelSnapshotRetentionAfterDays); + + if (modelSnapshotRetentionDays != null && + dailyModelSnapshotRetentionAfterDays != null && + dailyModelSnapshotRetentionAfterDays > modelSnapshotRetentionDays) { + throw new IllegalArgumentException(Messages.getMessage(Messages.JOB_CONFIG_MODEL_SNAPSHOT_RETENTION_SETTINGS_INCONSISTENT, + dailyModelSnapshotRetentionAfterDays, modelSnapshotRetentionDays)); + } + } + private void validateGroups() { for (String group : this.groups) { if (MlStrings.isValidId(group) == false) { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java index ecafdfd7e19b0..f39ed85bbb284 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.core.ml.job.messages; import org.elasticsearch.xpack.core.ml.MachineLearningField; +import org.elasticsearch.xpack.core.ml.job.config.Job; import java.text.MessageFormat; import java.util.Locale; @@ -212,6 +213,9 @@ public final class Messages { "This job would cause a mapping clash with existing field [{0}] - avoid the clash by assigning a dedicated results index"; public static final String JOB_CONFIG_TIME_FIELD_NOT_ALLOWED_IN_ANALYSIS_CONFIG = "data_description.time_field may not be used in the analysis_config"; + public static final String JOB_CONFIG_MODEL_SNAPSHOT_RETENTION_SETTINGS_INCONSISTENT = + "The value of '" + Job.DAILY_MODEL_SNAPSHOT_RETENTION_AFTER_DAYS + "' [{0}] cannot be greater than '" + + Job.MODEL_SNAPSHOT_RETENTION_DAYS + "' [{1}]"; public static final String JOB_AND_GROUP_NAMES_MUST_BE_UNIQUE = "job and group names must be unique but job [{0}] and group [{0}] have the same name"; diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java index 40fd084315d85..b02e559f5fec2 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java @@ -105,7 +105,7 @@ public void testConstructor_GivenEmptyJobConfiguration() { assertNull(job.getModelPlotConfig()); assertNull(job.getRenormalizationWindowDays()); assertNull(job.getBackgroundPersistInterval()); - assertThat(job.getModelSnapshotRetentionDays(), equalTo(1L)); + assertThat(job.getModelSnapshotRetentionDays(), equalTo(10L)); assertNull(job.getDailyModelSnapshotRetentionAfterDays()); assertNull(job.getResultsRetentionDays()); assertNotNull(job.allInputFields()); @@ -168,7 +168,7 @@ public void testEquals_GivenDifferentIds() { Job job1 = builder.build(); builder.setId("bar"); Job job2 = builder.build(); - assertFalse(job1.equals(job2)); + assertNotEquals(job1, job2); } public void testEquals_GivenDifferentRenormalizationWindowDays() { @@ -183,7 +183,7 @@ public void testEquals_GivenDifferentRenormalizationWindowDays() { jobDetails2.setRenormalizationWindowDays(4L); jobDetails2.setAnalysisConfig(createAnalysisConfig()); jobDetails2.setCreateTime(date); - assertFalse(jobDetails1.build().equals(jobDetails2.build())); + assertNotEquals(jobDetails1.build(), jobDetails2.build()); } public void testEquals_GivenDifferentBackgroundPersistInterval() { @@ -198,7 +198,7 @@ public void testEquals_GivenDifferentBackgroundPersistInterval() { jobDetails2.setBackgroundPersistInterval(TimeValue.timeValueSeconds(8000L)); jobDetails2.setAnalysisConfig(createAnalysisConfig()); jobDetails2.setCreateTime(date); - assertFalse(jobDetails1.build().equals(jobDetails2.build())); + assertNotEquals(jobDetails1.build(), jobDetails2.build()); } public void testEquals_GivenDifferentModelSnapshotRetentionDays() { @@ -213,7 +213,7 @@ public void testEquals_GivenDifferentModelSnapshotRetentionDays() { jobDetails2.setModelSnapshotRetentionDays(8L); jobDetails2.setAnalysisConfig(createAnalysisConfig()); jobDetails2.setCreateTime(date); - assertFalse(jobDetails1.build().equals(jobDetails2.build())); + assertNotEquals(jobDetails1.build(), jobDetails2.build()); } public void testEquals_GivenDifferentResultsRetentionDays() { @@ -228,7 +228,7 @@ public void testEquals_GivenDifferentResultsRetentionDays() { jobDetails2.setResultsRetentionDays(4L); jobDetails2.setAnalysisConfig(createAnalysisConfig()); jobDetails2.setCreateTime(date); - assertFalse(jobDetails1.build().equals(jobDetails2.build())); + assertNotEquals(jobDetails1.build(), jobDetails2.build()); } public void testEquals_GivenDifferentCustomSettings() { @@ -240,7 +240,7 @@ public void testEquals_GivenDifferentCustomSettings() { Map customSettings2 = new HashMap<>(); customSettings2.put("key2", "value2"); jobDetails2.setCustomSettings(customSettings2); - assertFalse(jobDetails1.build().equals(jobDetails2.build())); + assertNotEquals(jobDetails1.build(), jobDetails2.build()); } // JobConfigurationTests: @@ -397,6 +397,30 @@ public void testVerify_GivenNegativeModelSnapshotRetentionDays() { assertEquals(errorMessage, e.getMessage()); } + public void testVerify_GivenNegativeDailyModelSnapshotRetentionAfterDays() { + String errorMessage = + Messages.getMessage(Messages.JOB_CONFIG_FIELD_VALUE_TOO_LOW, "daily_model_snapshot_retention_after_days", 0, -1); + Job.Builder builder = buildJobBuilder("foo"); + builder.setDailyModelSnapshotRetentionAfterDays(-1L); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, builder::build); + + assertEquals(errorMessage, e.getMessage()); + } + + public void testVerify_GivenInconsistentModelSnapshotRetentionSettings() { + long dailyModelSnapshotRetentionAfterDays = randomLongBetween(1, Long.MAX_VALUE); + long modelSnapshotRetentionDays = randomLongBetween(0, dailyModelSnapshotRetentionAfterDays - 1); + String errorMessage = + Messages.getMessage(Messages.JOB_CONFIG_MODEL_SNAPSHOT_RETENTION_SETTINGS_INCONSISTENT, + dailyModelSnapshotRetentionAfterDays, modelSnapshotRetentionDays); + Job.Builder builder = buildJobBuilder("foo"); + builder.setDailyModelSnapshotRetentionAfterDays(dailyModelSnapshotRetentionAfterDays); + builder.setModelSnapshotRetentionDays(modelSnapshotRetentionDays); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, builder::build); + + assertEquals(errorMessage, e.getMessage()); + } + public void testVerify_GivenLowBackgroundPersistInterval() { String errorMessage = Messages.getMessage(Messages.JOB_CONFIG_FIELD_VALUE_TOO_LOW, "background_persist_interval", 3600, 3599); Job.Builder builder = buildJobBuilder("foo"); @@ -628,7 +652,11 @@ public static Job createRandomizedJob() { builder.setModelSnapshotRetentionDays(randomNonNegativeLong()); } if (randomBoolean()) { - builder.setDailyModelSnapshotRetentionAfterDays(randomNonNegativeLong()); + if (builder.getModelSnapshotRetentionDays() != null) { + builder.setDailyModelSnapshotRetentionAfterDays(randomLongBetween(0, builder.getModelSnapshotRetentionDays())); + } else { + builder.setDailyModelSnapshotRetentionAfterDays(randomNonNegativeLong()); + } } if (randomBoolean()) { builder.setResultsRetentionDays(randomNonNegativeLong()); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java index 7b906f6c33c44..fdcb990957f7c 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java @@ -73,11 +73,31 @@ public JobUpdate createRandom(String jobId, @Nullable Job job) { if (randomBoolean()) { update.setBackgroundPersistInterval(TimeValue.timeValueHours(randomIntBetween(1, 24))); } - if (randomBoolean()) { - update.setModelSnapshotRetentionDays(randomNonNegativeLong()); + // It's quite complicated to ensure updates of the two model snapshot retention settings are valid: + // - We might be updating both, one or neither. + // - If we update both the values in the update must be consistent. + // - If we update just one then that one must be consistent with the value of the other one in the job that's being updated. + Long maxValidDailyModelSnapshotRetentionAfterDays = (job == null) ? null : job.getModelSnapshotRetentionDays(); + boolean willSetModelSnapshotRetentionDays = randomBoolean(); + boolean willSetDailyModelSnapshotRetentionAfterDays = randomBoolean(); + if (willSetModelSnapshotRetentionDays) { + if (willSetDailyModelSnapshotRetentionAfterDays) { + maxValidDailyModelSnapshotRetentionAfterDays = randomNonNegativeLong(); + update.setModelSnapshotRetentionDays(maxValidDailyModelSnapshotRetentionAfterDays); + } else { + if (job == null || job.getDailyModelSnapshotRetentionAfterDays() == null) { + update.setModelSnapshotRetentionDays(randomNonNegativeLong()); + } else { + update.setModelSnapshotRetentionDays(randomLongBetween(job.getDailyModelSnapshotRetentionAfterDays(), Long.MAX_VALUE)); + } + } } - if (randomBoolean()) { - update.setDailyModelSnapshotRetentionAfterDays(randomNonNegativeLong()); + if (willSetDailyModelSnapshotRetentionAfterDays) { + if (maxValidDailyModelSnapshotRetentionAfterDays != null) { + update.setDailyModelSnapshotRetentionAfterDays(randomLongBetween(0, maxValidDailyModelSnapshotRetentionAfterDays)); + } else { + update.setDailyModelSnapshotRetentionAfterDays(randomNonNegativeLong()); + } } if (randomBoolean()) { update.setResultsRetentionDays(randomNonNegativeLong()); @@ -215,7 +235,10 @@ public void testMergeWithJob() { updateBuilder.setAnalysisLimits(analysisLimits); updateBuilder.setBackgroundPersistInterval(TimeValue.timeValueHours(randomIntBetween(1, 24))); updateBuilder.setResultsRetentionDays(randomNonNegativeLong()); - updateBuilder.setModelSnapshotRetentionDays(randomNonNegativeLong()); + // The createRandom() method tests the complex interactions between these next two, so this test can always update both + long newModelSnapshotRetentionDays = randomNonNegativeLong(); + updateBuilder.setModelSnapshotRetentionDays(newModelSnapshotRetentionDays); + updateBuilder.setDailyModelSnapshotRetentionAfterDays(randomLongBetween(0, newModelSnapshotRetentionDays)); updateBuilder.setRenormalizationWindowDays(randomNonNegativeLong()); updateBuilder.setCategorizationFilters(categorizationFilters); updateBuilder.setCustomSettings(customSettings); @@ -224,7 +247,7 @@ public void testMergeWithJob() { JobUpdate update = updateBuilder.build(); Job.Builder jobBuilder = new Job.Builder("foo"); - jobBuilder.setGroups(Arrays.asList("group-1")); + jobBuilder.setGroups(Collections.singletonList("group-1")); Detector.Builder d1 = new Detector.Builder("info_content", "domain"); d1.setOverFieldName("mlcategory"); Detector.Builder d2 = new Detector.Builder("min", "field"); @@ -281,7 +304,7 @@ public void testIsAutodetectProcessUpdate() { assertTrue(update.isAutodetectProcessUpdate()); update = new JobUpdate.Builder("foo").setDetectorUpdates(Collections.singletonList(mock(JobUpdate.DetectorUpdate.class))).build(); assertTrue(update.isAutodetectProcessUpdate()); - update = new JobUpdate.Builder("foo").setGroups(Arrays.asList("bar")).build(); + update = new JobUpdate.Builder("foo").setGroups(Collections.singletonList("bar")).build(); assertTrue(update.isAutodetectProcessUpdate()); } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java index cdd12a8fe10e5..612fe23edbdad 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java @@ -109,11 +109,17 @@ public void testDeleteExpiredData() throws Exception { } ActionFuture indexUnusedStateDocsResponse = bulkRequestBuilder.execute(); - registerJob(newJobBuilder("no-retention").setResultsRetentionDays(null).setModelSnapshotRetentionDays(1000L)); - registerJob(newJobBuilder("results-retention").setResultsRetentionDays(1L).setModelSnapshotRetentionDays(1000L)); - registerJob(newJobBuilder("snapshots-retention").setResultsRetentionDays(null).setModelSnapshotRetentionDays(2L)); - registerJob(newJobBuilder("snapshots-retention-with-retain").setResultsRetentionDays(null).setModelSnapshotRetentionDays(2L)); - registerJob(newJobBuilder("results-and-snapshots-retention").setResultsRetentionDays(1L).setModelSnapshotRetentionDays(2L)); + // These jobs don't thin out model state; ModelSnapshotRetentionIT tests that + registerJob(newJobBuilder("no-retention") + .setResultsRetentionDays(null).setModelSnapshotRetentionDays(1000L).setDailyModelSnapshotRetentionAfterDays(1000L)); + registerJob(newJobBuilder("results-retention") + .setResultsRetentionDays(1L).setModelSnapshotRetentionDays(1000L).setDailyModelSnapshotRetentionAfterDays(1000L)); + registerJob(newJobBuilder("snapshots-retention") + .setResultsRetentionDays(null).setModelSnapshotRetentionDays(2L).setDailyModelSnapshotRetentionAfterDays(2L)); + registerJob(newJobBuilder("snapshots-retention-with-retain") + .setResultsRetentionDays(null).setModelSnapshotRetentionDays(2L).setDailyModelSnapshotRetentionAfterDays(2L)); + registerJob(newJobBuilder("results-and-snapshots-retention") + .setResultsRetentionDays(1L).setModelSnapshotRetentionDays(2L).setDailyModelSnapshotRetentionAfterDays(2L)); List shortExpiryForecastIds = new ArrayList<>(); @@ -171,9 +177,10 @@ public void testDeleteExpiredData() throws Exception { // Refresh to ensure the snapshot timestamp updates are visible refresh("*"); - // We need to wait a second to ensure the second time around model snapshots will have a different ID (it depends on epoch seconds) - // FIXME it would be better to wait for something concrete instead of wait for time to elapse - assertBusy(() -> {}, 1, TimeUnit.SECONDS); + // We need to wait for the clock to tick to a new second to ensure the second time + // around model snapshots will have a different ID (it depends on epoch seconds) + long before = System.currentTimeMillis() / 1000; + assertBusy(() -> assertNotEquals(before, System.currentTimeMillis() / 1000), 1, TimeUnit.SECONDS); for (Job.Builder job : getJobs()) { // Run up to now diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ModelSnapshotRetentionIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ModelSnapshotRetentionIT.java new file mode 100644 index 0000000000000..bc3afbc338187 --- /dev/null +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ModelSnapshotRetentionIT.java @@ -0,0 +1,234 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.integration; + +import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.bulk.BulkAction; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexAction; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.search.SearchAction; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.xpack.core.ml.action.PutJobAction; +import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; +import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.core.ml.job.config.DataDescription; +import org.elasticsearch.xpack.core.ml.job.config.Detector; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelState; +import org.junit.After; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.List; + +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; + +public class ModelSnapshotRetentionIT extends MlNativeAutodetectIntegTestCase { + + private static final long MS_IN_DAY = TimeValue.timeValueDays(1).millis(); + + @After + public void cleanUpTest() { + cleanUp(); + } + + public void testModelSnapshotRetentionNoDailyThinning() throws Exception { + + String jobId = "no-daily-thinning"; + + int numDocsPerSnapshot = randomIntBetween(1, 4); + int numSnapshotsPerDay = randomIntBetween(1, 4); + int modelSnapshotRetentionDays = randomIntBetween(1, 10); + int numPriorDays = randomIntBetween(1, 5); + + createJob(jobId, modelSnapshotRetentionDays, modelSnapshotRetentionDays); + + List expectedModelSnapshotDocIds = new ArrayList<>(); + List expectedModelStateDocIds = new ArrayList<>(); + + long now = System.currentTimeMillis(); + long timeMs = now; + // We add 1 to make the maths easier, because the retention period includes + // the cutoff time, yet is measured from the timestamp of the latest snapshot + int numSnapshotsTotal = numSnapshotsPerDay * (modelSnapshotRetentionDays + numPriorDays) + 1; + for (int i = numSnapshotsTotal; i > 0; --i) { + String snapshotId = String.valueOf(i); + createModelSnapshot(jobId, snapshotId, new Date(timeMs), numDocsPerSnapshot, i == numSnapshotsTotal); + if (timeMs >= now - MS_IN_DAY * modelSnapshotRetentionDays) { + expectedModelSnapshotDocIds.add(ModelSnapshot.documentId(jobId, snapshotId)); + for (int j = 1; j <= numDocsPerSnapshot; ++j) { + expectedModelStateDocIds.add(ModelState.documentId(jobId, snapshotId, j)); + } + } + timeMs -= (MS_IN_DAY / numSnapshotsPerDay); + } + refresh(".ml*"); + + deleteExpiredData(); + + Collections.sort(expectedModelSnapshotDocIds); + Collections.sort(expectedModelStateDocIds); + assertThat(getAvailableModelSnapshotDocIds(jobId), is(expectedModelSnapshotDocIds)); + assertThat(getAvailableModelStateDocIds(), is(expectedModelStateDocIds)); + } + + public void testModelSnapshotRetentionWithDailyThinning() throws Exception { + + String jobId = "with-daily-thinning"; + + int numDocsPerSnapshot = randomIntBetween(1, 4); + int numSnapshotsPerDay = randomIntBetween(1, 4); + int modelSnapshotRetentionDays = randomIntBetween(2, 10); + int numPriorDays = randomIntBetween(1, 5); + int dailyModelSnapshotRetentionAfterDays = randomIntBetween(0, modelSnapshotRetentionDays - 1); + + createJob(jobId, modelSnapshotRetentionDays, dailyModelSnapshotRetentionAfterDays); + + List expectedModelSnapshotDocIds = new ArrayList<>(); + List expectedModelStateDocIds = new ArrayList<>(); + + long now = System.currentTimeMillis(); + long timeMs = now; + // We add 1 to make the maths easier, because the retention period includes + // the cutoff time, yet is measured from the timestamp of the latest snapshot + int numSnapshotsTotal = numSnapshotsPerDay * (modelSnapshotRetentionDays + numPriorDays) + 1; + for (int i = numSnapshotsTotal; i > 0; --i) { + String snapshotId = String.valueOf(i); + createModelSnapshot(jobId, snapshotId, new Date(timeMs), numDocsPerSnapshot, i == numSnapshotsTotal); + // We should retain: + // - Nothing older than modelSnapshotRetentionDays + // - Everything newer than dailyModelSnapshotRetentionAfterDays + // - The first snapshot of each day in between + if (timeMs >= now - MS_IN_DAY * modelSnapshotRetentionDays && + (timeMs >= now - MS_IN_DAY * dailyModelSnapshotRetentionAfterDays || + (now - timeMs) % MS_IN_DAY < MS_IN_DAY / numSnapshotsPerDay)) { + expectedModelSnapshotDocIds.add(ModelSnapshot.documentId(jobId, snapshotId)); + for (int j = 1; j <= numDocsPerSnapshot; ++j) { + expectedModelStateDocIds.add(ModelState.documentId(jobId, snapshotId, j)); + } + } + timeMs -= (MS_IN_DAY / numSnapshotsPerDay); + } + refresh(".ml*"); + + deleteExpiredData(); + + Collections.sort(expectedModelSnapshotDocIds); + Collections.sort(expectedModelStateDocIds); + assertThat(getAvailableModelSnapshotDocIds(jobId), is(expectedModelSnapshotDocIds)); + assertThat(getAvailableModelStateDocIds(), is(expectedModelStateDocIds)); + } + + private List getAvailableModelSnapshotDocIds(String jobId) { + SearchRequest searchRequest = new SearchRequest(); + searchRequest.indices(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)); + QueryBuilder query = QueryBuilders.boolQuery() + .filter(QueryBuilders.existsQuery(ModelSnapshot.SNAPSHOT_DOC_COUNT.getPreferredName())); + searchRequest.source(new SearchSourceBuilder().query(query).size(10000)); + + return getDocIdsFromSearch(searchRequest); + } + + private List getAvailableModelStateDocIds() { + SearchRequest searchRequest = new SearchRequest(); + searchRequest.indices(AnomalyDetectorsIndex.jobStateIndexPattern()); + searchRequest.source(new SearchSourceBuilder().size(10000)); + + return getDocIdsFromSearch(searchRequest); + } + + private List getDocIdsFromSearch(SearchRequest searchRequest) { + + SearchResponse searchResponse = client().execute(SearchAction.INSTANCE, searchRequest).actionGet(); + + List docIds = new ArrayList<>(); + assertThat(searchResponse.getHits(), notNullValue()); + for (SearchHit searchHit : searchResponse.getHits().getHits()) { + docIds.add(searchHit.getId()); + } + Collections.sort(docIds); + return docIds; + } + + private void createJob(String jobId, long modelSnapshotRetentionDays, long dailyModelSnapshotRetentionAfterDays) { + Detector detector = new Detector.Builder("count", null).build(); + + Job.Builder builder = new Job.Builder(); + builder.setId(jobId); + builder.setAnalysisConfig(new AnalysisConfig.Builder(Collections.singletonList(detector))); + builder.setDataDescription(new DataDescription.Builder()); + builder.setModelSnapshotRetentionDays(modelSnapshotRetentionDays); + builder.setDailyModelSnapshotRetentionAfterDays(dailyModelSnapshotRetentionAfterDays); + + PutJobAction.Request putJobRequest = new PutJobAction.Request(builder); + client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet(); + } + + private void createModelSnapshot(String jobId, String snapshotId, Date timestamp, int numDocs, boolean isActive) throws IOException { + persistModelSnapshotDoc(jobId, snapshotId, timestamp, numDocs, isActive); + persistModelStateDocs(jobId, snapshotId, numDocs); + if (isActive) { + JobUpdate jobUpdate = new JobUpdate.Builder(jobId).setModelSnapshotId(snapshotId).build(); + UpdateJobAction.Request updateJobRequest = UpdateJobAction.Request.internal(jobId, jobUpdate); + client().execute(UpdateJobAction.INSTANCE, updateJobRequest).actionGet(); + } + } + + private void persistModelSnapshotDoc(String jobId, String snapshotId, Date timestamp, int numDocs, + boolean immediateRefresh) throws IOException { + ModelSnapshot.Builder modelSnapshotBuilder = new ModelSnapshot.Builder(); + modelSnapshotBuilder.setJobId(jobId).setSnapshotId(snapshotId).setTimestamp(timestamp).setSnapshotDocCount(numDocs); + + IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.resultsWriteAlias(jobId)); + indexRequest.id(ModelSnapshot.documentId(jobId, snapshotId)); + if (immediateRefresh) { + indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + } + XContentBuilder xContentBuilder = JsonXContent.contentBuilder(); + modelSnapshotBuilder.build().toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS); + indexRequest.source(xContentBuilder); + + IndexResponse indexResponse = client().execute(IndexAction.INSTANCE, indexRequest).actionGet(); + assertThat(indexResponse.getResult(), is(DocWriteResponse.Result.CREATED)); + } + + private void persistModelStateDocs(String jobId, String snapshotId, int numDocs) { + assertThat(numDocs, greaterThan(0)); + + BulkRequest bulkRequest = new BulkRequest(); + for (int i = 1; i <= numDocs; ++i) { + IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias()); + indexRequest.id(ModelState.documentId(jobId, snapshotId, i)); + // The exact contents of the model state doesn't matter - we are not going to try and restore it + indexRequest.source(Collections.singletonMap("compressed", Collections.singletonList("foo"))); + bulkRequest.add(indexRequest); + } + + BulkResponse bulkResponse = client().execute(BulkAction.INSTANCE, bulkRequest).actionGet(); + assertFalse(bulkResponse.buildFailureMessage(), bulkResponse.hasFailures()); + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportMlInfoAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportMlInfoAction.java index dd55a07ebdce4..56d7d2a655173 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportMlInfoAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportMlInfoAction.java @@ -33,6 +33,7 @@ import java.io.IOException; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import java.util.concurrent.TimeoutException; @@ -80,10 +81,12 @@ private boolean upgradeMode() { } private Map anomalyDetectorsDefaults() { - Map defaults = new HashMap<>(); + Map defaults = new LinkedHashMap<>(); defaults.put(AnalysisLimits.MODEL_MEMORY_LIMIT.getPreferredName(), defaultModelMemoryLimit()); defaults.put(AnalysisLimits.CATEGORIZATION_EXAMPLES_LIMIT.getPreferredName(), AnalysisLimits.DEFAULT_CATEGORIZATION_EXAMPLES_LIMIT); defaults.put(Job.MODEL_SNAPSHOT_RETENTION_DAYS.getPreferredName(), Job.DEFAULT_MODEL_SNAPSHOT_RETENTION_DAYS); + defaults.put(Job.DAILY_MODEL_SNAPSHOT_RETENTION_AFTER_DAYS.getPreferredName(), + Job.DEFAULT_DAILY_MODEL_SNAPSHOT_RETENTION_AFTER_DAYS); try { defaults.put(CategorizationAnalyzerConfig.CATEGORIZATION_ANALYZER.getPreferredName(), CategorizationAnalyzerConfig.buildDefaultCategorizationAnalyzer(Collections.emptyList()) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java index d209060a823b4..7f6c202fa345a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java @@ -143,10 +143,7 @@ private ActionListener wrapDeleteOldDataList // acknowledged responses return ActionListener.wrap(response -> { Date deleteAfter = modelSnapshot.getLatestResultTimeStamp(); - logger.debug("Removing intervening records: last record: " + deleteAfter + ", last result: " - + modelSnapshot.getLatestResultTimeStamp()); - - logger.info("Deleting results after '" + deleteAfter + "'"); + logger.info("[{}] Removing intervening records after reverting model: deleting results after [{}]", jobId, deleteAfter); JobDataDeleter dataDeleter = new JobDataDeleter(client, jobId); dataDeleter.deleteResultsFromTime(deleteAfter.getTime() + 1, new ActionListener() { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 4bd96ea4d015b..2b385d1fa011b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -240,10 +240,12 @@ static void validateCategorizationAnalyzer(Job.Builder jobBuilder, AnalysisRegis public void putJob(PutJobAction.Request request, AnalysisRegistry analysisRegistry, ClusterState state, ActionListener actionListener) throws IOException { - request.getJobBuilder().validateAnalysisLimitsAndSetDefaults(maxModelMemoryLimit); - validateCategorizationAnalyzer(request.getJobBuilder(), analysisRegistry); + Job.Builder jobBuilder = request.getJobBuilder(); + jobBuilder.validateAnalysisLimitsAndSetDefaults(maxModelMemoryLimit); + jobBuilder.validateModelSnapshotRetentionSettingsAndSetDefaults(); + validateCategorizationAnalyzer(jobBuilder, analysisRegistry); - Job job = request.getJobBuilder().build(new Date()); + Job job = jobBuilder.build(new Date()); if (job.getDataDescription() != null && job.getDataDescription().getFormat() == DataDescription.DataFormat.DELIMITED) { deprecationLogger.deprecatedAndMaybeLog("ml_create_job_delimited_data", diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java index 97873464de643..bef38a8e33fae 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java @@ -299,7 +299,6 @@ public void onResponse(GetResponse getResponse) { return; } - final long version = getResponse.getVersion(); final long seqNo = getResponse.getSeqNo(); final long primaryTerm = getResponse.getPrimaryTerm(); BytesReference source = getResponse.getSourceAsBytesRef(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java index 439db5c21a914..2e8807cbdfd94 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java @@ -7,6 +7,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.OriginSettingClient; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.xpack.core.ml.job.config.Job; @@ -66,12 +67,14 @@ private void removeData(WrappedBatchedJobsIterator jobIterator, ActionListener { - if (cutoffEpochMs == null) { + response -> { + if (response == null) { removeData(jobIterator, listener, isTimedOutSupplier); } else { - removeDataBefore(job, cutoffEpochMs, ActionListener.wrap( - response -> removeData(jobIterator, listener, isTimedOutSupplier), + long latestTimeMs = response.v1(); + long cutoffEpochMs = response.v2(); + removeDataBefore(job, latestTimeMs, cutoffEpochMs, ActionListener.wrap( + r -> removeData(jobIterator, listener, isTimedOutSupplier), listener::onFailure)); } }, @@ -84,7 +87,7 @@ private WrappedBatchedJobsIterator newJobIterator() { return new WrappedBatchedJobsIterator(jobsIterator); } - abstract void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener listener); + abstract void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener> listener); abstract Long getRetentionDays(Job job); @@ -92,7 +95,7 @@ private WrappedBatchedJobsIterator newJobIterator() { * Template method to allow implementation details of various types of data (e.g. results, model snapshots). * Implementors need to call {@code listener.onResponse} when they are done in order to continue to the next job. */ - abstract void removeDataBefore(Job job, long cutoffEpochMs, ActionListener listener); + abstract void removeDataBefore(Job job, long latestTimeMs, long cutoffEpochMs, ActionListener listener); static BoolQueryBuilder createQuery(String jobId, long cutoffEpochMs) { return QueryBuilders.boolQuery() diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java index e76f1c7b13052..49ed77a09c0a9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.OriginSettingClient; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; @@ -27,7 +28,6 @@ import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; -import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField; import org.elasticsearch.xpack.ml.MachineLearning; @@ -54,6 +54,8 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover private static final Logger LOGGER = LogManager.getLogger(ExpiredModelSnapshotsRemover.class); + private static final long MS_IN_ONE_DAY = TimeValue.timeValueDays(1).getMillis(); + /** * The max number of snapshots to fetch per job. It is set to 10K, the default for an index as * we don't change that in our ML indices. It should be more than enough for most cases. If not, @@ -72,12 +74,19 @@ public ExpiredModelSnapshotsRemover(OriginSettingClient client, ThreadPool threa @Override Long getRetentionDays(Job job) { - return job.getModelSnapshotRetentionDays(); + // If a daily retention cutoff is set then we need to tell the base class that this is the cutoff + // point so that we get to consider deleting model snapshots older than this. Later on we will + // not actually delete all of the ones in between the hard cutoff and the daily retention cutoff. + Long retentionDaysForConsideration = job.getDailyModelSnapshotRetentionAfterDays(); + if (retentionDaysForConsideration == null) { + retentionDaysForConsideration = job.getModelSnapshotRetentionDays(); + } + return retentionDaysForConsideration; } @Override - void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener listener) { - ThreadedActionListener threadedActionListener = new ThreadedActionListener<>(LOGGER, threadPool, + void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener> listener) { + ThreadedActionListener> threadedActionListener = new ThreadedActionListener<>(LOGGER, threadPool, MachineLearning.UTILITY_THREAD_POOL_NAME, listener, false); latestSnapshotTimeStamp(jobId, ActionListener.wrap( @@ -86,7 +95,7 @@ void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener li threadedActionListener.onResponse(null); } else { long cutoff = latestTime - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis(); - threadedActionListener.onResponse(cutoff); + threadedActionListener.onResponse(new Tuple<>(latestTime, cutoff)); } }, listener::onFailure @@ -125,39 +134,53 @@ private void latestSnapshotTimeStamp(String jobId, ActionListener listener } @Override - protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener listener) { + protected void removeDataBefore(Job job, long latestTimeMs, long cutoffEpochMs, ActionListener listener) { + // TODO: delete this test if we ever allow users to revert a job to no model snapshot, e.g. to recover from data loss if (job.getModelSnapshotId() == null) { // No snapshot to remove listener.onResponse(true); return; } - LOGGER.debug("Removing model snapshots of job [{}] that have a timestamp before [{}]", job.getId(), cutoffEpochMs); + LOGGER.debug("Considering model snapshots of job [{}] that have a timestamp before [{}] for removal", job.getId(), cutoffEpochMs); SearchRequest searchRequest = new SearchRequest(); searchRequest.indices(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId())); QueryBuilder activeSnapshotFilter = QueryBuilders.termQuery( - ModelSnapshotField.SNAPSHOT_ID.getPreferredName(), job.getModelSnapshotId()); + ModelSnapshotField.SNAPSHOT_ID.getPreferredName(), job.getModelSnapshotId()); QueryBuilder retainFilter = QueryBuilders.termQuery(ModelSnapshot.RETAIN.getPreferredName(), true); QueryBuilder query = createQuery(job.getId(), cutoffEpochMs) - .filter(QueryBuilders.existsQuery(ModelSnapshot.SNAPSHOT_DOC_COUNT.getPreferredName())) - .mustNot(activeSnapshotFilter) - .mustNot(retainFilter); + .filter(QueryBuilders.existsQuery(ModelSnapshot.SNAPSHOT_DOC_COUNT.getPreferredName())) + .mustNot(activeSnapshotFilter) + .mustNot(retainFilter); - searchRequest.source(new SearchSourceBuilder().query(query).size(MODEL_SNAPSHOT_SEARCH_SIZE).sort(ElasticsearchMappings.ES_DOC)); + searchRequest.source(new SearchSourceBuilder().query(query).size(MODEL_SNAPSHOT_SEARCH_SIZE) + .sort(ModelSnapshot.TIMESTAMP.getPreferredName())); + long deleteAllBeforeMs = (job.getModelSnapshotRetentionDays() == null) + ? 0 : latestTimeMs - TimeValue.timeValueDays(job.getModelSnapshotRetentionDays()).getMillis(); client.execute(SearchAction.INSTANCE, searchRequest, new ThreadedActionListener<>(LOGGER, threadPool, - MachineLearning.UTILITY_THREAD_POOL_NAME, expiredSnapshotsListener(job.getId(), listener), false)); + MachineLearning.UTILITY_THREAD_POOL_NAME, expiredSnapshotsListener(job.getId(), deleteAllBeforeMs, listener), false)); } - private ActionListener expiredSnapshotsListener(String jobId, ActionListener listener) { + private ActionListener expiredSnapshotsListener(String jobId, long deleteAllBeforeMs, + ActionListener listener) { return new ActionListener<>() { @Override public void onResponse(SearchResponse searchResponse) { + long nextToKeepMs = deleteAllBeforeMs; try { List modelSnapshots = new ArrayList<>(); for (SearchHit hit : searchResponse.getHits()) { - modelSnapshots.add(ModelSnapshot.fromJson(hit.getSourceRef())); + ModelSnapshot modelSnapshot = ModelSnapshot.fromJson(hit.getSourceRef()); + long timestampMs = modelSnapshot.getTimestamp().getTime(); + if (timestampMs >= nextToKeepMs) { + do { + nextToKeepMs += MS_IN_ONE_DAY; + } while (timestampMs >= nextToKeepMs); + continue; + } + modelSnapshots.add(modelSnapshot); } deleteModelSnapshots(new VolatileCursorIterator<>(modelSnapshots), listener); } catch (Exception e) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java index 16ba77c582e8f..139d22082dbb1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.client.OriginSettingClient; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -84,7 +85,7 @@ Long getRetentionDays(Job job) { } @Override - protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener listener) { + protected void removeDataBefore(Job job, long latestTimeMs, long cutoffEpochMs, ActionListener listener) { LOGGER.debug("Removing results of job [{}] that have a timestamp before [{}]", job.getId(), cutoffEpochMs); DeleteByQueryRequest request = createDBQRequest(job, cutoffEpochMs); @@ -131,8 +132,8 @@ private DeleteByQueryRequest createDBQRequest(Job job, long cutoffEpochMs) { } @Override - void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener listener) { - ThreadedActionListener threadedActionListener = new ThreadedActionListener<>(LOGGER, threadPool, + void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener> listener) { + ThreadedActionListener> threadedActionListener = new ThreadedActionListener<>(LOGGER, threadPool, MachineLearning.UTILITY_THREAD_POOL_NAME, listener, false); latestBucketTime(jobId, ActionListener.wrap( latestTime -> { @@ -140,7 +141,7 @@ void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener li threadedActionListener.onResponse(null); } else { long cutoff = latestTime - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis(); - threadedActionListener.onResponse(cutoff); + threadedActionListener.onResponse(new Tuple<>(latestTime, cutoff)); } }, listener::onFailure diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java index 0d8955c05774f..8d46ce723919d 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -47,7 +48,7 @@ public class AbstractExpiredJobDataRemoverTests extends ESTestCase { // We can't test an abstract class so make a concrete class // as simple as possible - private class ConcreteExpiredJobDataRemover extends AbstractExpiredJobDataRemover { + private static class ConcreteExpiredJobDataRemover extends AbstractExpiredJobDataRemover { private int getRetentionDaysCallCount = 0; @@ -62,13 +63,14 @@ protected Long getRetentionDays(Job job) { return randomBoolean() ? null : 0L; } - void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener listener) { + @Override + void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener> listener) { long nowEpochMs = Instant.now(Clock.systemDefaultZone()).toEpochMilli(); - listener.onResponse(nowEpochMs - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis()); + listener.onResponse(new Tuple<>(nowEpochMs, nowEpochMs - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis())); } @Override - protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener listener) { + protected void removeDataBefore(Job job, long latestTimeMs, long cutoffEpochMs, ActionListener listener) { listener.onResponse(Boolean.TRUE); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java index a178cd48b7cad..e71f4d5b50cb9 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.OriginSettingClient; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; @@ -89,15 +90,17 @@ public void testRemove_GivenJobsWithMixedRetentionPolicies() throws IOException JobTests.buildJobBuilder("job-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build() ))); - Date oneDayAgo = new Date(new Date().getTime() - TimeValue.timeValueDays(1).getMillis()); + Date now = new Date(); + Date oneDayAgo = new Date(now.getTime() - TimeValue.timeValueDays(1).getMillis()); ModelSnapshot snapshot1_1 = createModelSnapshot("job-1", "fresh-snapshot", oneDayAgo); searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot1_1))); - Date eightDaysAgo = new Date(new Date().getTime() - TimeValue.timeValueDays(8).getMillis()); - ModelSnapshot snapshotToBeDeleted = createModelSnapshot("job-1", "old-snapshot", eightDaysAgo); + // It needs to be strictly more than 7 days before the most recent snapshot, hence the extra millisecond + Date eightDaysAndOneMsAgo = new Date(now.getTime() - TimeValue.timeValueDays(8).getMillis() - 1); + ModelSnapshot snapshotToBeDeleted = createModelSnapshot("job-1", "old-snapshot", eightDaysAndOneMsAgo); searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshotToBeDeleted))); - ModelSnapshot snapshot2_1 = createModelSnapshot("job-1", "snapshots-1_1", eightDaysAgo); + ModelSnapshot snapshot2_1 = createModelSnapshot("job-1", "snapshots-1_1", eightDaysAndOneMsAgo); searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot2_1))); searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.emptyList())); @@ -127,9 +130,10 @@ public void testRemove_GivenTimeout() throws IOException { JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build() ))); - List snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"), - createModelSnapshot("snapshots-1", "snapshots-1_2")); - List snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1")); + Date now = new Date(); + List snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1", now), + createModelSnapshot("snapshots-1", "snapshots-1_2", now)); + List snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1", now)); searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots)); searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots)); @@ -173,15 +177,19 @@ public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOExceptio JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build() ))); - ModelSnapshot snapshot1_1 = createModelSnapshot("snapshots-1", "snapshots-1_1"); + Date now = new Date(); + Date oneDayAgo = new Date(new Date().getTime() - TimeValue.timeValueDays(1).getMillis()); + ModelSnapshot snapshot1_1 = createModelSnapshot("snapshots-1", "snapshots-1_1", oneDayAgo); searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot1_1))); + // It needs to be strictly more than 7 days before the most recent snapshot, hence the extra millisecond + Date eightDaysAndOneMsAgo = new Date(now.getTime() - TimeValue.timeValueDays(8).getMillis() - 1); List snapshots1JobSnapshots = Arrays.asList( snapshot1_1, - createModelSnapshot("snapshots-1", "snapshots-1_2")); + createModelSnapshot("snapshots-1", "snapshots-1_2", eightDaysAndOneMsAgo)); searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots)); - ModelSnapshot snapshot2_2 = createModelSnapshot("snapshots-2", "snapshots-2_1"); + ModelSnapshot snapshot2_2 = createModelSnapshot("snapshots-2", "snapshots-2_1", eightDaysAndOneMsAgo); searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot2_2))); givenClientDeleteModelSnapshotRequestsFail(searchResponses); @@ -197,7 +205,7 @@ public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOExceptio assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(1)); DeleteModelSnapshotAction.Request deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(0); assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-1")); - assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-1_1")); + assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-1_2")); } @SuppressWarnings("unchecked") @@ -211,12 +219,12 @@ public void testCalcCutoffEpochMs() throws IOException { givenClientRequests(searchResponses, true, true); long retentionDays = 3L; - ActionListener cutoffListener = mock(ActionListener.class); + ActionListener> cutoffListener = mock(ActionListener.class); createExpiredModelSnapshotsRemover().calcCutoffEpochMs("job-1", retentionDays, cutoffListener); long dayInMills = 60 * 60 * 24 * 1000; long expectedCutoffTime = oneDayAgo.getTime() - (dayInMills * retentionDays); - verify(cutoffListener).onResponse(eq(expectedCutoffTime)); + verify(cutoffListener).onResponse(eq(new Tuple<>(oneDayAgo.getTime(), expectedCutoffTime))); } private ExpiredModelSnapshotsRemover createExpiredModelSnapshotsRemover() { @@ -234,10 +242,6 @@ private ExpiredModelSnapshotsRemover createExpiredModelSnapshotsRemover() { return new ExpiredModelSnapshotsRemover(originSettingClient, threadPool); } - private static ModelSnapshot createModelSnapshot(String jobId, String snapshotId) { - return new ModelSnapshot.Builder(jobId).setSnapshotId(snapshotId).setTimestamp(new Date()).build(); - } - private static ModelSnapshot createModelSnapshot(String jobId, String snapshotId, Date date) { return new ModelSnapshot.Builder(jobId).setSnapshotId(snapshotId).setTimestamp(date).build(); } @@ -269,7 +273,8 @@ public Void answer(InvocationOnMock invocationOnMock) { capturedSearchRequests.add(searchRequest); // Only the last search request should fail if (shouldSearchRequestsSucceed || callCount.get() < searchResponses.size()) { - listener.onResponse(searchResponses.get(callCount.getAndIncrement())); + SearchResponse response = searchResponses.get(callCount.getAndIncrement()); + listener.onResponse(response); } else { listener.onFailure(new RuntimeException("search failed")); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java index 29c8dad0c668c..529fff5ac84f2 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.OriginSettingClient; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; @@ -145,12 +146,12 @@ public void testCalcCutoffEpochMs() { givenSearchResponses(Collections.singletonList(JobTests.buildJobBuilder(jobId).setResultsRetentionDays(1L).build()), new Bucket(jobId, latest, 60)); - ActionListener cutoffListener = mock(ActionListener.class); + ActionListener> cutoffListener = mock(ActionListener.class); createExpiredResultsRemover().calcCutoffEpochMs(jobId, 1L, cutoffListener); long dayInMills = 60 * 60 * 24 * 1000; long expectedCutoffTime = latest.getTime() - dayInMills; - verify(cutoffListener).onResponse(eq(expectedCutoffTime)); + verify(cutoffListener).onResponse(eq(new Tuple<>(latest.getTime(), expectedCutoffTime))); } private void givenDBQRequestsSucceed() { diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yml index af6f349930193..b5cfa75a5c2c0 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yml @@ -172,6 +172,44 @@ ml.get_jobs: job_id: "non-existing" +--- +"Test put job with inconsistent model snapshot settings": + - do: + catch: /The value of daily_model_snapshot_retention_after_days \[4\] cannot be greater than model_snapshot_retention_days \[3\]/ + ml.put_job: + job_id: inconsistent-snapshot-settings-1 + body: > + { + "description":"Analysis of response time by airline", + "analysis_config" : { + "bucket_span": "1h", + "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] + }, + "data_description" : { + "time_field":"@timestamp" + }, + "model_snapshot_retention_days": 3, + "daily_model_snapshot_retention_after_days": 4 + } + + - do: + catch: /daily_model_snapshot_retention_after_days cannot be less than 0. Value = -1/ + ml.put_job: + job_id: inconsistent-snapshot-settings-2 + body: > + { + "description":"Analysis of response time by airline", + "analysis_config" : { + "bucket_span": "1h", + "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] + }, + "data_description" : { + "time_field":"@timestamp" + }, + "model_snapshot_retention_days": 3, + "daily_model_snapshot_retention_after_days": -1 + } + --- "Test put job with inconsistent body/param ids": - do: @@ -297,6 +335,7 @@ "renormalization_window_days": 1, "background_persist_interval": "2h", "model_snapshot_retention_days": 3, + "daily_model_snapshot_retention_after_days": 2, "results_retention_days": 4, "custom_settings": { "setting1": "custom1", @@ -363,6 +402,7 @@ - match: { renormalization_window_days: 10 } - match: { background_persist_interval: "3h" } - match: { model_snapshot_retention_days: 30 } + - match: { daily_model_snapshot_retention_after_days: 2 } - match: { results_retention_days: 40 } - do: @@ -403,6 +443,24 @@ } - match: { analysis_limits.model_memory_limit: "15mb" } + - do: + catch: /The value of daily_model_snapshot_retention_after_days \[31\] cannot be greater than model_snapshot_retention_days \[30\]/ + ml.update_job: + job_id: jobs-crud-update-job + body: > + { + "daily_model_snapshot_retention_after_days": 31 + } + + - do: + catch: /The value of daily_model_snapshot_retention_after_days \[2\] cannot be greater than model_snapshot_retention_days \[1\]/ + ml.update_job: + job_id: jobs-crud-update-job + body: > + { + "model_snapshot_retention_days": 1 + } + - do: catch: bad_request ml.update_job: diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/ml_info.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/ml_info.yml index d3e3b00223464..3478a0bc8f3c7 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/ml_info.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/ml_info.yml @@ -13,7 +13,8 @@ teardown: - match: { defaults.anomaly_detectors.categorization_analyzer.tokenizer: "ml_classic" } - match: { defaults.anomaly_detectors.model_memory_limit: "1gb" } - match: { defaults.anomaly_detectors.categorization_examples_limit: 4 } - - match: { defaults.anomaly_detectors.model_snapshot_retention_days: 1 } + - match: { defaults.anomaly_detectors.model_snapshot_retention_days: 10 } + - match: { defaults.anomaly_detectors.daily_model_snapshot_retention_after_days: 1 } - match: { defaults.datafeeds.scroll_size: 1000 } - is_false: limits.max_model_memory_limit # We cannot assert an exact value for the next one as it will vary depending on the test machine @@ -31,7 +32,8 @@ teardown: - match: { defaults.anomaly_detectors.categorization_analyzer.tokenizer: "ml_classic" } - match: { defaults.anomaly_detectors.model_memory_limit: "512mb" } - match: { defaults.anomaly_detectors.categorization_examples_limit: 4 } - - match: { defaults.anomaly_detectors.model_snapshot_retention_days: 1 } + - match: { defaults.anomaly_detectors.model_snapshot_retention_days: 10 } + - match: { defaults.anomaly_detectors.daily_model_snapshot_retention_after_days: 1 } - match: { defaults.datafeeds.scroll_size: 1000 } - match: { limits.max_model_memory_limit: "512mb" } # We cannot assert an exact value for the next one as it will vary depending on the test machine @@ -49,7 +51,8 @@ teardown: - match: { defaults.anomaly_detectors.categorization_analyzer.tokenizer: "ml_classic" } - match: { defaults.anomaly_detectors.model_memory_limit: "1gb" } - match: { defaults.anomaly_detectors.categorization_examples_limit: 4 } - - match: { defaults.anomaly_detectors.model_snapshot_retention_days: 1 } + - match: { defaults.anomaly_detectors.model_snapshot_retention_days: 10 } + - match: { defaults.anomaly_detectors.daily_model_snapshot_retention_after_days: 1 } - match: { defaults.datafeeds.scroll_size: 1000 } - match: { limits.max_model_memory_limit: "6gb" } # We cannot assert an exact value for the next one as it will vary depending on the test machine @@ -67,7 +70,8 @@ teardown: - match: { defaults.anomaly_detectors.categorization_analyzer.tokenizer: "ml_classic" } - match: { defaults.anomaly_detectors.model_memory_limit: "1gb" } - match: { defaults.anomaly_detectors.categorization_examples_limit: 4 } - - match: { defaults.anomaly_detectors.model_snapshot_retention_days: 1 } + - match: { defaults.anomaly_detectors.model_snapshot_retention_days: 10 } + - match: { defaults.anomaly_detectors.daily_model_snapshot_retention_after_days: 1 } - match: { defaults.datafeeds.scroll_size: 1000 } - match: { limits.max_model_memory_limit: "6gb" } # We cannot assert an exact value for the next one as it will vary depending on the test machine @@ -85,7 +89,8 @@ teardown: - match: { defaults.anomaly_detectors.categorization_analyzer.tokenizer: "ml_classic" } - match: { defaults.anomaly_detectors.model_memory_limit: "1mb" } - match: { defaults.anomaly_detectors.categorization_examples_limit: 4 } - - match: { defaults.anomaly_detectors.model_snapshot_retention_days: 1 } + - match: { defaults.anomaly_detectors.model_snapshot_retention_days: 10 } + - match: { defaults.anomaly_detectors.daily_model_snapshot_retention_after_days: 1 } - match: { defaults.datafeeds.scroll_size: 1000 } - match: { limits.max_model_memory_limit: "1mb" } # This time we can assert an exact value for the next one because the hard limit is so low From 3e7bf0e0a6c26886920e968367795e41d1c0d52a Mon Sep 17 00:00:00 2001 From: lcawl Date: Mon, 4 May 2020 12:40:07 -0700 Subject: [PATCH 2/6] [DOCS] Updates retention descriptions --- docs/reference/ml/ml-shared.asciidoc | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/docs/reference/ml/ml-shared.asciidoc b/docs/reference/ml/ml-shared.asciidoc index faa46d5e5c1a1..0e38f9a864ec2 100644 --- a/docs/reference/ml/ml-shared.asciidoc +++ b/docs/reference/ml/ml-shared.asciidoc @@ -362,16 +362,15 @@ example, it can contain custom URL information as shown in end::custom-settings[] tag::daily-model-snapshot-retention-after-days[] -Advanced configuration option. A number of days in between 0 and the value of -`model_snapshot_retention_days` after which only one model snapshot per day -is retained instead of all model snapshots. Age is calculated relative to the -timestamp of the newest model snapshot. The default value is `1` for newly -created jobs, which means that all snapshots are retained for one day but -older snapshots are thinned out such that only one per day is retained until -`model_snapshot_retention_days`. For jobs that were created before this -setting was available, the default is the same as that job's -`model_snapshot_retention_days` setting, to preserve the original behavior -that no thinning out of model snapshots is done. +Advanced configuration option. Specifies a number of days between 0 and the +value of `model_snapshot_retention_days`. After this period of time, only one +model snapshot per day is retained for this job. Age is calculated relative to +the timestamp of the newest model snapshot. For new jobs, the default value is +`1`, which means that all snapshots are retained for one day. Older snapshots +are thinned out such that only one per day is retained. For jobs that were +created before this setting was available, the default value matches the +`model_snapshot_retention_days` value, which preserves the original behavior +and no thinning out of model snapshots occurs. end::daily-model-snapshot-retention-after-days[] tag::data-description[] @@ -1010,8 +1009,8 @@ end::model-snapshot-id[] tag::model-snapshot-retention-days[] Advanced configuration option. The period of time (in days) that model snapshots are retained. Age is calculated relative to the timestamp of the newest model -snapshot. The default value is `1`, which means snapshots that are one day -(twenty-four hours) older than the newest snapshot are deleted. +snapshot. The default value is `10`, which means snapshots that are ten days +older than the newest snapshot are deleted. end::model-snapshot-retention-days[] tag::model-timestamp[] From ee14fd324c0be7fb244975499d34700bf0fd7b4c Mon Sep 17 00:00:00 2001 From: David Roberts Date: Tue, 5 May 2020 09:32:49 +0100 Subject: [PATCH 3/6] Update docs/reference/ml/ml-shared.asciidoc Co-authored-by: Lisa Cawley --- docs/reference/ml/ml-shared.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/reference/ml/ml-shared.asciidoc b/docs/reference/ml/ml-shared.asciidoc index 0e38f9a864ec2..279daffcb15e6 100644 --- a/docs/reference/ml/ml-shared.asciidoc +++ b/docs/reference/ml/ml-shared.asciidoc @@ -363,7 +363,7 @@ end::custom-settings[] tag::daily-model-snapshot-retention-after-days[] Advanced configuration option. Specifies a number of days between 0 and the -value of `model_snapshot_retention_days`. After this period of time, only one +value of `model_snapshot_retention_days`. After this period of time, only the first model snapshot per day is retained for this job. Age is calculated relative to the timestamp of the newest model snapshot. For new jobs, the default value is `1`, which means that all snapshots are retained for one day. Older snapshots From 7951a77f19af480e2375218944eba18e2c3d0679 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Tue, 5 May 2020 09:37:59 +0100 Subject: [PATCH 4/6] Fixing tests --- .../test/java/org/elasticsearch/client/MachineLearningIT.java | 4 +++- .../xpack/core/ml/job/results/ReservedFieldNames.java | 1 + .../elasticsearch/xpack/core/ml/config_index_mappings.json | 3 +++ x-pack/plugin/ml/qa/ml-with-security/build.gradle | 1 + 4 files changed, 8 insertions(+), 1 deletion(-) diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java index e86c8c2f9b0c3..d9870c83c677c 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java @@ -2517,10 +2517,12 @@ private static Job buildJobForExpiredDataTests(String jobId) { .setFunction("count") .setDetectorDescription(randomAlphaOfLength(10)) .build(); - AnalysisConfig.Builder configBuilder = new AnalysisConfig.Builder(Arrays.asList(detector)); + AnalysisConfig.Builder configBuilder = new AnalysisConfig.Builder(Collections.singletonList(detector)); //should not be random, see:https://github.com/elastic/ml-cpp/issues/208 configBuilder.setBucketSpan(new TimeValue(1, TimeUnit.HOURS)); builder.setAnalysisConfig(configBuilder); + builder.setModelSnapshotRetentionDays(1L); + builder.setDailyModelSnapshotRetentionAfterDays(1L); DataDescription.Builder dataDescription = new DataDescription.Builder(); dataDescription.setTimeFormat(DataDescription.EPOCH_MS); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java index 1db272ec157a1..b406ce85f69cd 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java @@ -231,6 +231,7 @@ public final class ReservedFieldNames { Job.RENORMALIZATION_WINDOW_DAYS.getPreferredName(), Job.BACKGROUND_PERSIST_INTERVAL.getPreferredName(), Job.MODEL_SNAPSHOT_RETENTION_DAYS.getPreferredName(), + Job.DAILY_MODEL_SNAPSHOT_RETENTION_AFTER_DAYS.getPreferredName(), Job.RESULTS_RETENTION_DAYS.getPreferredName(), Job.MODEL_SNAPSHOT_ID.getPreferredName(), Job.MODEL_SNAPSHOT_MIN_VERSION.getPreferredName(), diff --git a/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json b/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json index fe0c559b12245..6dc4c6872f57c 100644 --- a/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json +++ b/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json @@ -229,6 +229,9 @@ "type" : "object", "enabled" : false }, + "daily_model_snapshot_retention_after_days" : { + "type" : "long" + }, "data_description" : { "properties" : { "field_delimiter" : { diff --git a/x-pack/plugin/ml/qa/ml-with-security/build.gradle b/x-pack/plugin/ml/qa/ml-with-security/build.gradle index 2de640d77330e..48c055912ca2e 100644 --- a/x-pack/plugin/ml/qa/ml-with-security/build.gradle +++ b/x-pack/plugin/ml/qa/ml-with-security/build.gradle @@ -157,6 +157,7 @@ integTest.runner { 'ml/jobs_crud/Test put job after closing results index', 'ml/jobs_crud/Test put job after closing state index', 'ml/jobs_crud/Test put job with inconsistent body/param ids', + 'ml/jobs_crud/Test put job with inconsistent model snapshot settings', 'ml/jobs_crud/Test put job with time field in analysis_config', 'ml/jobs_crud/Test put job with duplicate detector configurations', 'ml/jobs_crud/Test job with categorization_analyzer and categorization_filters', From 3201549168e5524398a96b6db160a92419339403 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Tue, 5 May 2020 10:58:31 +0100 Subject: [PATCH 5/6] Use a dedicated class instead of Tuple --- .../AbstractExpiredJobDataRemover.java | 45 ++++++++++++++++--- .../ExpiredModelSnapshotsRemover.java | 7 ++- .../job/retention/ExpiredResultsRemover.java | 7 ++- .../AbstractExpiredJobDataRemoverTests.java | 5 +-- .../ExpiredModelSnapshotsRemoverTests.java | 5 +-- .../retention/ExpiredResultsRemoverTests.java | 5 +-- 6 files changed, 51 insertions(+), 23 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java index 2e8807cbdfd94..a7ebb8e70843e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java @@ -7,7 +7,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.OriginSettingClient; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.xpack.core.ml.job.config.Job; @@ -19,6 +18,7 @@ import java.util.Deque; import java.util.Iterator; import java.util.List; +import java.util.Objects; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -71,9 +71,7 @@ private void removeData(WrappedBatchedJobsIterator jobIterator, ActionListener removeData(jobIterator, listener, isTimedOutSupplier), listener::onFailure)); } @@ -87,7 +85,7 @@ private WrappedBatchedJobsIterator newJobIterator() { return new WrappedBatchedJobsIterator(jobsIterator); } - abstract void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener> listener); + abstract void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener listener); abstract Long getRetentionDays(Job job); @@ -109,7 +107,7 @@ static BoolQueryBuilder createQuery(String jobId, long cutoffEpochMs) { * This class abstracts away the logic of pulling one job at a time from * multiple batches. */ - private class WrappedBatchedJobsIterator implements Iterator { + private static class WrappedBatchedJobsIterator implements Iterator { private final BatchedJobsIterator batchedIterator; private VolatileCursorIterator currentBatch; @@ -147,4 +145,39 @@ private VolatileCursorIterator createBatchIteratorFromBatch(Deque(jobs); } } + + /** + * The latest time that cutoffs are measured from is not wall clock time, + * but some other reference point that makes sense for the type of data + * being removed. This class groups the cutoff time with it's "latest" + * reference point. + */ + protected static final class CutoffDetails { + + public long latestTimeMs; + public long cutoffEpochMs; + + public CutoffDetails(long latestTimeMs, long cutoffEpochMs) { + this.latestTimeMs = latestTimeMs; + this.cutoffEpochMs = cutoffEpochMs; + } + + @Override + public int hashCode() { + return Objects.hash(latestTimeMs, cutoffEpochMs); + } + + @Override + public boolean equals(Object other) { + if (other == this) { + return true; + } + if (other instanceof CutoffDetails == false) { + return false; + } + CutoffDetails that = (CutoffDetails) other; + return this.latestTimeMs == that.latestTimeMs && + this.cutoffEpochMs == that.cutoffEpochMs; + } + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java index 49ed77a09c0a9..d6808af8bd1af 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java @@ -15,7 +15,6 @@ import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.OriginSettingClient; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; @@ -85,8 +84,8 @@ Long getRetentionDays(Job job) { } @Override - void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener> listener) { - ThreadedActionListener> threadedActionListener = new ThreadedActionListener<>(LOGGER, threadPool, + void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener listener) { + ThreadedActionListener threadedActionListener = new ThreadedActionListener<>(LOGGER, threadPool, MachineLearning.UTILITY_THREAD_POOL_NAME, listener, false); latestSnapshotTimeStamp(jobId, ActionListener.wrap( @@ -95,7 +94,7 @@ void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener(latestTime, cutoff)); + threadedActionListener.onResponse(new CutoffDetails(latestTime, cutoff)); } }, listener::onFailure diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java index 139d22082dbb1..54b0d1f54753f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java @@ -13,7 +13,6 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.client.OriginSettingClient; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -132,8 +131,8 @@ private DeleteByQueryRequest createDBQRequest(Job job, long cutoffEpochMs) { } @Override - void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener> listener) { - ThreadedActionListener> threadedActionListener = new ThreadedActionListener<>(LOGGER, threadPool, + void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener listener) { + ThreadedActionListener threadedActionListener = new ThreadedActionListener<>(LOGGER, threadPool, MachineLearning.UTILITY_THREAD_POOL_NAME, listener, false); latestBucketTime(jobId, ActionListener.wrap( latestTime -> { @@ -141,7 +140,7 @@ void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener(latestTime, cutoff)); + threadedActionListener.onResponse(new CutoffDetails(latestTime, cutoff)); } }, listener::onFailure diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java index 8d46ce723919d..8dc5bec70fbe5 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java @@ -12,7 +12,6 @@ import org.elasticsearch.client.Client; import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -64,9 +63,9 @@ protected Long getRetentionDays(Job job) { } @Override - void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener> listener) { + void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener listener) { long nowEpochMs = Instant.now(Clock.systemDefaultZone()).toEpochMilli(); - listener.onResponse(new Tuple<>(nowEpochMs, nowEpochMs - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis())); + listener.onResponse(new CutoffDetails(nowEpochMs, nowEpochMs - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis())); } @Override diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java index e71f4d5b50cb9..f48ad5a7e1225 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java @@ -12,7 +12,6 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.OriginSettingClient; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; @@ -219,12 +218,12 @@ public void testCalcCutoffEpochMs() throws IOException { givenClientRequests(searchResponses, true, true); long retentionDays = 3L; - ActionListener> cutoffListener = mock(ActionListener.class); + ActionListener cutoffListener = mock(ActionListener.class); createExpiredModelSnapshotsRemover().calcCutoffEpochMs("job-1", retentionDays, cutoffListener); long dayInMills = 60 * 60 * 24 * 1000; long expectedCutoffTime = oneDayAgo.getTime() - (dayInMills * retentionDays); - verify(cutoffListener).onResponse(eq(new Tuple<>(oneDayAgo.getTime(), expectedCutoffTime))); + verify(cutoffListener).onResponse(eq(new AbstractExpiredJobDataRemover.CutoffDetails(oneDayAgo.getTime(), expectedCutoffTime))); } private ExpiredModelSnapshotsRemover createExpiredModelSnapshotsRemover() { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java index 529fff5ac84f2..b1691baca0c79 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java @@ -11,7 +11,6 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.OriginSettingClient; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; @@ -146,12 +145,12 @@ public void testCalcCutoffEpochMs() { givenSearchResponses(Collections.singletonList(JobTests.buildJobBuilder(jobId).setResultsRetentionDays(1L).build()), new Bucket(jobId, latest, 60)); - ActionListener> cutoffListener = mock(ActionListener.class); + ActionListener cutoffListener = mock(ActionListener.class); createExpiredResultsRemover().calcCutoffEpochMs(jobId, 1L, cutoffListener); long dayInMills = 60 * 60 * 24 * 1000; long expectedCutoffTime = latest.getTime() - dayInMills; - verify(cutoffListener).onResponse(eq(new Tuple<>(latest.getTime(), expectedCutoffTime))); + verify(cutoffListener).onResponse(eq(new AbstractExpiredJobDataRemover.CutoffDetails(latest.getTime(), expectedCutoffTime))); } private void givenDBQRequestsSucceed() { From c3f6a8477a99e16633c08c9f07ff2d9f70e1d71a Mon Sep 17 00:00:00 2001 From: David Roberts Date: Tue, 5 May 2020 11:51:16 +0100 Subject: [PATCH 6/6] Make members final --- .../xpack/ml/job/retention/AbstractExpiredJobDataRemover.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java index a7ebb8e70843e..0173f5b27d0d6 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java @@ -154,8 +154,8 @@ private VolatileCursorIterator createBatchIteratorFromBatch(Deque