Skip to content

Commit

Permalink
[ML] Switch poor categorization audit warning to use status field
Browse files Browse the repository at this point in the history
In elastic#51146 a rudimentary check for poor categorization was added to
7.6.

This change replaces that warning based on a Java-side check with
a new one based on the categorization_status field that the ML C++
sets.  categorization_status was added in 7.7 and above by elastic#51879,
so this new warning based on more advanced conditions will also be
in 7.7 and above.

Closes elastic#50749
  • Loading branch information
droberts195 committed Feb 11, 2020
1 parent 72b84ad commit 5f6078e
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public final class Messages {
"Adjust the analysis_limits.model_memory_limit setting to ensure all data is analyzed";
public static final String JOB_AUDIT_MEMORY_STATUS_HARD_LIMIT_PRE_7_2 = "Job memory status changed to hard_limit at {0}; adjust the " +
"analysis_limits.model_memory_limit setting to ensure all data is analyzed";
public static final String JOB_AUDIT_EXCESSIVE_EARLY_CATEGORIES = "{0} categories observed in the first [{1}] buckets." +
public static final String JOB_AUDIT_CATEGORIZATION_STATUS_WARN = "categorization_status changed to [{0}] after [{1}] buckets." +
" This suggests an inappropriate categorization_field_name has been chosen.";

public static final String JOB_CONFIG_CATEGORIZATION_FILTERS_CONTAINS_DUPLICATES = "categorization_filters contain duplicates";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,6 @@ public class AutodetectResultProcessor {

private static final Logger LOGGER = LogManager.getLogger(AutodetectResultProcessor.class);

static final long EARLY_BUCKET_THRESHOLD = 100;
static final int EXCESSIVE_EARLY_CATEGORY_COUNT = 1000;

private final Client client;
private final AnomalyDetectionAuditor auditor;
private final String jobId;
Expand All @@ -90,9 +87,8 @@ public class AutodetectResultProcessor {
private final FlushListener flushListener;
private volatile boolean processKilled;
private volatile boolean failed;
private long priorRunsBucketCount;
private final long priorRunsBucketCount;
private long currentRunBucketCount; // only used from the process() thread, so doesn't need to be volatile
private boolean excessiveCategoryWarningIssued; // only used from the process() thread, so doesn't need to be volatile
private final JobResultsPersister.Builder bulkResultsPersister;
private boolean deleteInterimRequired;

Expand Down Expand Up @@ -230,7 +226,7 @@ void processResult(AutodetectResult result) {
}
CategoryDefinition categoryDefinition = result.getCategoryDefinition();
if (categoryDefinition != null) {
processCategoryDefinition(categoryDefinition);
persister.persistCategoryDefinition(categoryDefinition, this::isAlive);
}
ModelPlot modelPlot = result.getModelPlot();
if (modelPlot != null) {
Expand Down Expand Up @@ -314,22 +310,6 @@ void processResult(AutodetectResult result) {
}
}

private void processCategoryDefinition(CategoryDefinition categoryDefinition) {
persister.persistCategoryDefinition(categoryDefinition, this::isAlive);
if (categoryDefinition.getCategoryId() == EXCESSIVE_EARLY_CATEGORY_COUNT &&
priorRunsBucketCount + currentRunBucketCount < EARLY_BUCKET_THRESHOLD &&
excessiveCategoryWarningIssued == false) {
auditor.warning(jobId, Messages.getMessage(Messages.JOB_AUDIT_EXCESSIVE_EARLY_CATEGORIES, EXCESSIVE_EARLY_CATEGORY_COUNT,
// Add 1 because category definitions are written before buckets
1L + priorRunsBucketCount + currentRunBucketCount));
// This flag won't be retained if the job is closed and reopened, or if the job migrates to another node.
// This means it's possible the audit message is generated multiple times. However, that's not a
// disaster, and is also very unlikely in the the (best practice) cases where initial lookback covers
// more than 100 buckets.
excessiveCategoryWarningIssued = true;
}
}

private void processModelSizeStats(ModelSizeStats modelSizeStats) {
LOGGER.trace("[{}] Parsed ModelSizeStats: {} / {} / {} / {} / {} / {}",
jobId, modelSizeStats.getModelBytes(), modelSizeStats.getTotalByFieldCount(),
Expand All @@ -338,6 +318,8 @@ private void processModelSizeStats(ModelSizeStats modelSizeStats) {

persister.persistModelSizeStats(modelSizeStats, this::isAlive);
notifyModelMemoryStatusChange(modelSizeStats);
notifyCategorizationStatusChange(modelSizeStats);

latestModelSizeStats = modelSizeStats;
}

Expand All @@ -359,6 +341,16 @@ private void notifyModelMemoryStatusChange(ModelSizeStats modelSizeStats) {
}
}

private void notifyCategorizationStatusChange(ModelSizeStats modelSizeStats) {
ModelSizeStats.CategorizationStatus categorizationStatus = modelSizeStats.getCategorizationStatus();
if (categorizationStatus != latestModelSizeStats.getCategorizationStatus()) {
if (categorizationStatus == ModelSizeStats.CategorizationStatus.WARN) {
auditor.warning(jobId, Messages.getMessage(Messages.JOB_AUDIT_CATEGORIZATION_STATUS_WARN, categorizationStatus,
priorRunsBucketCount + currentRunBucketCount));
}
}
}

protected void updateModelSnapshotOnJob(ModelSnapshot modelSnapshot) {
JobUpdate update = new JobUpdate.Builder(jobId).setModelSnapshotId(modelSnapshot.getSnapshotId()).build();
UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(jobId, update);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
Expand Down Expand Up @@ -52,14 +51,12 @@
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doThrow;
Expand Down Expand Up @@ -213,65 +210,6 @@ public void testProcessResult_categoryDefinition() {
verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
}

public void testProcessResult_excessiveCategoryDefinitionCountEarly() {
int iterations = 3;
int categoryCount = AutodetectResultProcessor.EXCESSIVE_EARLY_CATEGORY_COUNT * 2;

processorUnderTest.setDeleteInterimRequired(false);

AutodetectResult result = mock(AutodetectResult.class);
for (int iteration = 1; iteration <= iterations; ++iteration) {
for (int categoryId = 1; categoryId <= categoryCount; ++categoryId) {
CategoryDefinition categoryDefinition = new CategoryDefinition(JOB_ID);
categoryDefinition.setCategoryId(categoryId);
when(result.getCategoryDefinition()).thenReturn(categoryDefinition);

processorUnderTest.processResult(result);
}
}

verify(bulkBuilder, never()).executeRequest();
verify(persister, times(iterations * categoryCount)).persistCategoryDefinition(any(CategoryDefinition.class), any());
verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
verify(auditor).warning(eq(JOB_ID), eq(Messages.getMessage(Messages.JOB_AUDIT_EXCESSIVE_EARLY_CATEGORIES,
AutodetectResultProcessor.EXCESSIVE_EARLY_CATEGORY_COUNT, 1)));
}

public void testProcessResult_highCategoryDefinitionCountLateOn() {
int iterations = 3;
int categoryCount = AutodetectResultProcessor.EXCESSIVE_EARLY_CATEGORY_COUNT * 2;

processorUnderTest.setDeleteInterimRequired(false);

when(bulkBuilder.persistTimingStats(any(TimingStats.class))).thenReturn(bulkBuilder);
when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder);

AutodetectResult bucketResult = mock(AutodetectResult.class);
final int numPriorBuckets = (int) AutodetectResultProcessor.EARLY_BUCKET_THRESHOLD + 1;
for (int i = 0; i < numPriorBuckets; ++i) {
Bucket bucket = new Bucket(JOB_ID, new Date(i * 1000 + 1000000), BUCKET_SPAN_MS);
when(bucketResult.getBucket()).thenReturn(bucket);
processorUnderTest.processResult(bucketResult);
}

AutodetectResult categoryResult = mock(AutodetectResult.class);
for (int iteration = 1; iteration <= iterations; ++iteration) {
for (int categoryId = 1; categoryId <= categoryCount; ++categoryId) {
CategoryDefinition categoryDefinition = new CategoryDefinition(JOB_ID);
categoryDefinition.setCategoryId(categoryId);
when(categoryResult.getCategoryDefinition()).thenReturn(categoryDefinition);
processorUnderTest.processResult(categoryResult);
}
}

verify(bulkBuilder).persistTimingStats(any(TimingStats.class));
verify(bulkBuilder, times(numPriorBuckets)).persistBucket(any(Bucket.class));
verify(bulkBuilder, times(numPriorBuckets)).executeRequest();
verify(persister, times(iterations * categoryCount)).persistCategoryDefinition(any(CategoryDefinition.class), any());
verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
verify(auditor, never()).warning(eq(JOB_ID), anyString());
}

public void testProcessResult_flushAcknowledgement() {
AutodetectResult result = mock(AutodetectResult.class);
FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class);
Expand Down Expand Up @@ -335,11 +273,6 @@ public void testProcessResult_modelSizeStats() {
}

public void testProcessResult_modelSizeStatsWithMemoryStatusChanges() {
TimeValue delay = TimeValue.timeValueSeconds(5);
// Set up schedule delay time
when(threadPool.schedule(any(Runnable.class), any(TimeValue.class), anyString()))
.thenAnswer(i -> executor.schedule((Runnable) i.getArguments()[0], delay.nanos(), TimeUnit.NANOSECONDS));

AutodetectResult result = mock(AutodetectResult.class);
processorUnderTest.setDeleteInterimRequired(false);

Expand Down Expand Up @@ -374,6 +307,33 @@ public void testProcessResult_modelSizeStatsWithMemoryStatusChanges() {
verify(auditor).error(JOB_ID, Messages.getMessage(Messages.JOB_AUDIT_MEMORY_STATUS_HARD_LIMIT, "512mb", "1kb"));
}

public void testProcessResult_modelSizeStatsWithCategorizationStatusChanges() {
AutodetectResult result = mock(AutodetectResult.class);
processorUnderTest.setDeleteInterimRequired(false);

// First one with ok
ModelSizeStats modelSizeStats =
new ModelSizeStats.Builder(JOB_ID).setCategorizationStatus(ModelSizeStats.CategorizationStatus.OK).build();
when(result.getModelSizeStats()).thenReturn(modelSizeStats);
processorUnderTest.processResult(result);

// Now one with warn
modelSizeStats = new ModelSizeStats.Builder(JOB_ID).setCategorizationStatus(ModelSizeStats.CategorizationStatus.WARN).build();
when(result.getModelSizeStats()).thenReturn(modelSizeStats);
processorUnderTest.processResult(result);

// Another with warn
modelSizeStats = new ModelSizeStats.Builder(JOB_ID).setCategorizationStatus(ModelSizeStats.CategorizationStatus.WARN).build();
when(result.getModelSizeStats()).thenReturn(modelSizeStats);
processorUnderTest.processResult(result);


verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
verify(persister, times(3)).persistModelSizeStats(any(ModelSizeStats.class), any());
// We should have only fired one notification; only the change from ok to warn should have fired, not the subsequent warn
verify(auditor).warning(JOB_ID, Messages.getMessage(Messages.JOB_AUDIT_CATEGORIZATION_STATUS_WARN, "warn", 0));
}

public void testProcessResult_modelSnapshot() {
AutodetectResult result = mock(AutodetectResult.class);
ModelSnapshot modelSnapshot = new ModelSnapshot.Builder(JOB_ID)
Expand Down

0 comments on commit 5f6078e

Please sign in to comment.