diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java index dbebc580b5730..80d5c4d02a24b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java @@ -135,7 +135,7 @@ public final class Messages { "Adjust the analysis_limits.model_memory_limit setting to ensure all data is analyzed"; public static final String JOB_AUDIT_MEMORY_STATUS_HARD_LIMIT_PRE_7_2 = "Job memory status changed to hard_limit at {0}; adjust the " + "analysis_limits.model_memory_limit setting to ensure all data is analyzed"; - public static final String JOB_AUDIT_EXCESSIVE_EARLY_CATEGORIES = "{0} categories observed in the first [{1}] buckets." + + public static final String JOB_AUDIT_CATEGORIZATION_STATUS_WARN = "categorization_status changed to [{0}] after [{1}] buckets." + " This suggests an inappropriate categorization_field_name has been chosen."; public static final String JOB_CONFIG_CATEGORIZATION_FILTERS_CONTAINS_DUPLICATES = "categorization_filters contain duplicates"; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java index 51043d4da42bd..4c6de562156e9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java @@ -74,9 +74,6 @@ public class AutodetectResultProcessor { private static final Logger LOGGER = LogManager.getLogger(AutodetectResultProcessor.class); - static final long EARLY_BUCKET_THRESHOLD = 100; - static final int EXCESSIVE_EARLY_CATEGORY_COUNT = 1000; - private final Client client; private final AnomalyDetectionAuditor auditor; private final String jobId; @@ -90,9 +87,8 @@ public class AutodetectResultProcessor { private final FlushListener flushListener; private volatile boolean processKilled; private volatile boolean failed; - private long priorRunsBucketCount; + private final long priorRunsBucketCount; private long currentRunBucketCount; // only used from the process() thread, so doesn't need to be volatile - private boolean excessiveCategoryWarningIssued; // only used from the process() thread, so doesn't need to be volatile private final JobResultsPersister.Builder bulkResultsPersister; private boolean deleteInterimRequired; @@ -230,7 +226,7 @@ void processResult(AutodetectResult result) { } CategoryDefinition categoryDefinition = result.getCategoryDefinition(); if (categoryDefinition != null) { - processCategoryDefinition(categoryDefinition); + persister.persistCategoryDefinition(categoryDefinition, this::isAlive); } ModelPlot modelPlot = result.getModelPlot(); if (modelPlot != null) { @@ -314,22 +310,6 @@ void processResult(AutodetectResult result) { } } - private void processCategoryDefinition(CategoryDefinition categoryDefinition) { - persister.persistCategoryDefinition(categoryDefinition, this::isAlive); - if (categoryDefinition.getCategoryId() == EXCESSIVE_EARLY_CATEGORY_COUNT && - priorRunsBucketCount + currentRunBucketCount < EARLY_BUCKET_THRESHOLD && - excessiveCategoryWarningIssued == false) { - auditor.warning(jobId, Messages.getMessage(Messages.JOB_AUDIT_EXCESSIVE_EARLY_CATEGORIES, EXCESSIVE_EARLY_CATEGORY_COUNT, - // Add 1 because category definitions are written before buckets - 1L + priorRunsBucketCount + currentRunBucketCount)); - // This flag won't be retained if the job is closed and reopened, or if the job migrates to another node. - // This means it's possible the audit message is generated multiple times. However, that's not a - // disaster, and is also very unlikely in the the (best practice) cases where initial lookback covers - // more than 100 buckets. - excessiveCategoryWarningIssued = true; - } - } - private void processModelSizeStats(ModelSizeStats modelSizeStats) { LOGGER.trace("[{}] Parsed ModelSizeStats: {} / {} / {} / {} / {} / {}", jobId, modelSizeStats.getModelBytes(), modelSizeStats.getTotalByFieldCount(), @@ -338,6 +318,8 @@ private void processModelSizeStats(ModelSizeStats modelSizeStats) { persister.persistModelSizeStats(modelSizeStats, this::isAlive); notifyModelMemoryStatusChange(modelSizeStats); + notifyCategorizationStatusChange(modelSizeStats); + latestModelSizeStats = modelSizeStats; } @@ -359,6 +341,16 @@ private void notifyModelMemoryStatusChange(ModelSizeStats modelSizeStats) { } } + private void notifyCategorizationStatusChange(ModelSizeStats modelSizeStats) { + ModelSizeStats.CategorizationStatus categorizationStatus = modelSizeStats.getCategorizationStatus(); + if (categorizationStatus != latestModelSizeStats.getCategorizationStatus()) { + if (categorizationStatus == ModelSizeStats.CategorizationStatus.WARN) { + auditor.warning(jobId, Messages.getMessage(Messages.JOB_AUDIT_CATEGORIZATION_STATUS_WARN, categorizationStatus, + priorRunsBucketCount + currentRunBucketCount)); + } + } + } + protected void updateModelSnapshotOnJob(ModelSnapshot modelSnapshot) { JobUpdate update = new JobUpdate.Builder(jobId).setModelSnapshotId(modelSnapshot.getSnapshotId()).build(); UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(jobId, update); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java index d7375ac9a8481..83ae4b4ecf6af 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java @@ -17,7 +17,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; @@ -52,14 +51,12 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.same; import static org.mockito.Mockito.doThrow; @@ -213,65 +210,6 @@ public void testProcessResult_categoryDefinition() { verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); } - public void testProcessResult_excessiveCategoryDefinitionCountEarly() { - int iterations = 3; - int categoryCount = AutodetectResultProcessor.EXCESSIVE_EARLY_CATEGORY_COUNT * 2; - - processorUnderTest.setDeleteInterimRequired(false); - - AutodetectResult result = mock(AutodetectResult.class); - for (int iteration = 1; iteration <= iterations; ++iteration) { - for (int categoryId = 1; categoryId <= categoryCount; ++categoryId) { - CategoryDefinition categoryDefinition = new CategoryDefinition(JOB_ID); - categoryDefinition.setCategoryId(categoryId); - when(result.getCategoryDefinition()).thenReturn(categoryDefinition); - - processorUnderTest.processResult(result); - } - } - - verify(bulkBuilder, never()).executeRequest(); - verify(persister, times(iterations * categoryCount)).persistCategoryDefinition(any(CategoryDefinition.class), any()); - verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); - verify(auditor).warning(eq(JOB_ID), eq(Messages.getMessage(Messages.JOB_AUDIT_EXCESSIVE_EARLY_CATEGORIES, - AutodetectResultProcessor.EXCESSIVE_EARLY_CATEGORY_COUNT, 1))); - } - - public void testProcessResult_highCategoryDefinitionCountLateOn() { - int iterations = 3; - int categoryCount = AutodetectResultProcessor.EXCESSIVE_EARLY_CATEGORY_COUNT * 2; - - processorUnderTest.setDeleteInterimRequired(false); - - when(bulkBuilder.persistTimingStats(any(TimingStats.class))).thenReturn(bulkBuilder); - when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder); - - AutodetectResult bucketResult = mock(AutodetectResult.class); - final int numPriorBuckets = (int) AutodetectResultProcessor.EARLY_BUCKET_THRESHOLD + 1; - for (int i = 0; i < numPriorBuckets; ++i) { - Bucket bucket = new Bucket(JOB_ID, new Date(i * 1000 + 1000000), BUCKET_SPAN_MS); - when(bucketResult.getBucket()).thenReturn(bucket); - processorUnderTest.processResult(bucketResult); - } - - AutodetectResult categoryResult = mock(AutodetectResult.class); - for (int iteration = 1; iteration <= iterations; ++iteration) { - for (int categoryId = 1; categoryId <= categoryCount; ++categoryId) { - CategoryDefinition categoryDefinition = new CategoryDefinition(JOB_ID); - categoryDefinition.setCategoryId(categoryId); - when(categoryResult.getCategoryDefinition()).thenReturn(categoryDefinition); - processorUnderTest.processResult(categoryResult); - } - } - - verify(bulkBuilder).persistTimingStats(any(TimingStats.class)); - verify(bulkBuilder, times(numPriorBuckets)).persistBucket(any(Bucket.class)); - verify(bulkBuilder, times(numPriorBuckets)).executeRequest(); - verify(persister, times(iterations * categoryCount)).persistCategoryDefinition(any(CategoryDefinition.class), any()); - verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); - verify(auditor, never()).warning(eq(JOB_ID), anyString()); - } - public void testProcessResult_flushAcknowledgement() { AutodetectResult result = mock(AutodetectResult.class); FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class); @@ -335,11 +273,6 @@ public void testProcessResult_modelSizeStats() { } public void testProcessResult_modelSizeStatsWithMemoryStatusChanges() { - TimeValue delay = TimeValue.timeValueSeconds(5); - // Set up schedule delay time - when(threadPool.schedule(any(Runnable.class), any(TimeValue.class), anyString())) - .thenAnswer(i -> executor.schedule((Runnable) i.getArguments()[0], delay.nanos(), TimeUnit.NANOSECONDS)); - AutodetectResult result = mock(AutodetectResult.class); processorUnderTest.setDeleteInterimRequired(false); @@ -374,6 +307,33 @@ public void testProcessResult_modelSizeStatsWithMemoryStatusChanges() { verify(auditor).error(JOB_ID, Messages.getMessage(Messages.JOB_AUDIT_MEMORY_STATUS_HARD_LIMIT, "512mb", "1kb")); } + public void testProcessResult_modelSizeStatsWithCategorizationStatusChanges() { + AutodetectResult result = mock(AutodetectResult.class); + processorUnderTest.setDeleteInterimRequired(false); + + // First one with ok + ModelSizeStats modelSizeStats = + new ModelSizeStats.Builder(JOB_ID).setCategorizationStatus(ModelSizeStats.CategorizationStatus.OK).build(); + when(result.getModelSizeStats()).thenReturn(modelSizeStats); + processorUnderTest.processResult(result); + + // Now one with warn + modelSizeStats = new ModelSizeStats.Builder(JOB_ID).setCategorizationStatus(ModelSizeStats.CategorizationStatus.WARN).build(); + when(result.getModelSizeStats()).thenReturn(modelSizeStats); + processorUnderTest.processResult(result); + + // Another with warn + modelSizeStats = new ModelSizeStats.Builder(JOB_ID).setCategorizationStatus(ModelSizeStats.CategorizationStatus.WARN).build(); + when(result.getModelSizeStats()).thenReturn(modelSizeStats); + processorUnderTest.processResult(result); + + + verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); + verify(persister, times(3)).persistModelSizeStats(any(ModelSizeStats.class), any()); + // We should have only fired one notification; only the change from ok to warn should have fired, not the subsequent warn + verify(auditor).warning(JOB_ID, Messages.getMessage(Messages.JOB_AUDIT_CATEGORIZATION_STATUS_WARN, "warn", 0)); + } + public void testProcessResult_modelSnapshot() { AutodetectResult result = mock(AutodetectResult.class); ModelSnapshot modelSnapshot = new ModelSnapshot.Builder(JOB_ID)