Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Add graceful retry for anomaly detector result indexing failures #49508

Merged
merged 20 commits into from
Dec 12, 2019
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
19c67d6
[ML] Add graceful retry for bulk index results failures
benwtrent Nov 22, 2019
1877d4e
fixing test, removing unused import
benwtrent Nov 25, 2019
6d59172
moving retries to individual actions in `processResult` so result pro…
benwtrent Nov 25, 2019
ed4d9a1
clear out bulk request if not able to persist after retrying
benwtrent Nov 25, 2019
ff2b984
Merge branch 'master' into feature/ml-persist-results-retry
elasticmachine Nov 25, 2019
1bf708e
removing successful bulk response items from subsequent bulk requests
benwtrent Nov 25, 2019
02c220c
Merge branch 'master' into feature/ml-persist-results-retry
benwtrent Dec 3, 2019
791c570
adding retries for all synchronous persistent calls
benwtrent Dec 3, 2019
bf749d7
Fixing refresh policy handling bug and missing settings
benwtrent Dec 4, 2019
0e01259
fixing datafeed timing stats persistence
benwtrent Dec 4, 2019
4710635
rewriting datacounts reporter to use retries
benwtrent Dec 4, 2019
7e4642f
cleanup and addressing pr comments
benwtrent Dec 9, 2019
cb8a5b2
using builder where possible
benwtrent Dec 9, 2019
7814ed2
adjusing backoff calculation
benwtrent Dec 9, 2019
fbd717a
Merge branch 'master' into feature/ml-persist-results-retry
elasticmachine Dec 9, 2019
724dfcf
disallow int wrapping
benwtrent Dec 9, 2019
cb03f38
Merge branch 'feature/ml-persist-results-retry' of github.com:benwtre…
benwtrent Dec 9, 2019
756be6c
Adding auditor messaging indicating failures and retries
benwtrent Dec 9, 2019
6e1a25a
addressing PR comments
benwtrent Dec 10, 2019
a9fba44
Merge branch 'master' into feature/ml-persist-results-retry
elasticmachine Dec 12, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.integration;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.ml.action.GetBucketsAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Detector;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.core.ml.job.results.Result;
import org.junit.After;
import org.junit.Before;

import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeedBuilder;
import static org.hamcrest.Matchers.greaterThan;

public class BulkFailureRetryIT extends MlNativeAutodetectIntegTestCase {

private final String index = "bulk-failure-retry";
private long now = System.currentTimeMillis();
private static long DAY = Duration.ofDays(1).toMillis();
private final String jobId = "bulk-failure-retry-job";
private final String resultsIndex = ".ml-anomalies-custom-bulk-failure-retry-job";

@Before
public void putPastDataIntoIndex() {
client().admin().indices().prepareCreate(index)
.addMapping("type", "time", "type=date", "value", "type=long")
.get();
long twoDaysAgo = now - DAY * 2;
long threeDaysAgo = now - DAY * 3;
writeData(logger, index, 250, threeDaysAgo, twoDaysAgo);
}

@After
public void cleanUpTest() {
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it have a potential of affecting other, unrelated tests?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see the @After clause. It sets all back to null

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this part. My question was more along the lines if it is possible that 2 test classes will share these cluster settings. But I guess it's not the case.

.putNull("xpack.ml.persist_results_max_retries")
.putNull("logger.org.elasticsearch.xpack.ml.datafeed.DatafeedJob")
.putNull("logger.org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister")
.putNull("logger.org.elasticsearch.xpack.ml.job.process.autodetect.output")
.build()).get();
cleanUp();
}

private void ensureAnomaliesWrite() throws InterruptedException {
Settings settings = Settings.builder().put(IndexMetaData.INDEX_READ_ONLY_SETTING.getKey(), false).build();
AtomicReference<AcknowledgedResponse> acknowledgedResponseHolder = new AtomicReference<>();
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
blockingCall(
listener -> client().admin().indices().prepareUpdateSettings(resultsIndex).setSettings(settings).execute(listener),
acknowledgedResponseHolder,
exceptionHolder);
if (exceptionHolder.get() != null) {
fail("FAILED TO MARK ["+ resultsIndex + "] as read-write again" + exceptionHolder.get());
}
}

private void setAnomaliesReadOnlyBlock() throws InterruptedException {
Settings settings = Settings.builder().put(IndexMetaData.INDEX_READ_ONLY_SETTING.getKey(), true).build();
AtomicReference<AcknowledgedResponse> acknowledgedResponseHolder = new AtomicReference<>();
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
blockingCall(
listener -> client().admin().indices().prepareUpdateSettings(resultsIndex).setSettings(settings).execute(listener),
acknowledgedResponseHolder,
exceptionHolder);
if (exceptionHolder.get() != null) {
fail("FAILED TO MARK ["+ resultsIndex + "] as read-ONLY: " + exceptionHolder.get());
}
}

public void testBulkFailureRetries() throws Exception {
Job.Builder job = createJob(jobId, TimeValue.timeValueMinutes(5), "count", null);
job.setResultsIndexName(jobId);

DatafeedConfig.Builder datafeedConfigBuilder =
createDatafeedBuilder(job.getId() + "-datafeed", job.getId(), Collections.singletonList(index));
DatafeedConfig datafeedConfig = datafeedConfigBuilder.build();
registerJob(job);
putJob(job);
openJob(job.getId());
registerDatafeed(datafeedConfig);
putDatafeed(datafeedConfig);
long twoDaysAgo = now - 2 * DAY;
startDatafeed(datafeedConfig.getId(), 0L, twoDaysAgo);
waitUntilJobIsClosed(jobId);

// Get the job stats
Bucket initialLatestBucket = getLatestFinalizedBucket(jobId);
assertThat(initialLatestBucket.getEpoch(), greaterThan(0L));

client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder()
.put("logger.org.elasticsearch.xpack.ml.datafeed.DatafeedJob", "TRACE")
.put("logger.org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister", "TRACE")
.put("logger.org.elasticsearch.xpack.ml.job.process.autodetect.output", "TRACE")
.put("xpack.ml.persist_results_max_retries", "15")
.build()).get();

setAnomaliesReadOnlyBlock();

int moreDocs = 1_000;
writeData(logger, index, moreDocs, twoDaysAgo, now);

openJob(job.getId());
startDatafeed(datafeedConfig.getId(), twoDaysAgo, now);

ensureAnomaliesWrite();
waitUntilJobIsClosed(jobId);

Bucket newLatestBucket = getLatestFinalizedBucket(jobId);
assertThat(newLatestBucket.getEpoch(), greaterThan(initialLatestBucket.getEpoch()));
}

private Job.Builder createJob(String id, TimeValue bucketSpan, String function, String field) {
return createJob(id, bucketSpan, function, field, null);
}

private Job.Builder createJob(String id, TimeValue bucketSpan, String function, String field, String summaryCountField) {
DataDescription.Builder dataDescription = new DataDescription.Builder();
przemekwitek marked this conversation as resolved.
Show resolved Hide resolved
dataDescription.setFormat(DataDescription.DataFormat.XCONTENT);
dataDescription.setTimeField("time");
dataDescription.setTimeFormat(DataDescription.EPOCH_MS);

Detector.Builder d = new Detector.Builder(function, field);
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build()))
.setBucketSpan(bucketSpan)
.setSummaryCountFieldName(summaryCountField);

return new Job.Builder().setId(id).setAnalysisConfig(analysisConfig).setDataDescription(dataDescription);
}

private void writeData(Logger logger, String index, long numDocs, long start, long end) {
int maxDelta = (int) (end - start - 1);
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
for (int i = 0; i < numDocs; i++) {
IndexRequest indexRequest = new IndexRequest(index);
long timestamp = start + randomIntBetween(0, maxDelta);
assert timestamp >= start && timestamp < end;
indexRequest.source("time", timestamp, "value", i);
bulkRequestBuilder.add(indexRequest);
}
BulkResponse bulkResponse = bulkRequestBuilder
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
if (bulkResponse.hasFailures()) {
int failures = 0;
for (BulkItemResponse itemResponse : bulkResponse) {
if (itemResponse.isFailed()) {
failures++;
logger.error("Item response failure [{}]", itemResponse.getFailureMessage());
}
}
fail("Bulk response contained " + failures + " failures");
}
logger.info("Indexed [{}] documents", numDocs);
}

private Bucket getLatestFinalizedBucket(String jobId) {
GetBucketsAction.Request getBucketsRequest = new GetBucketsAction.Request(jobId);
getBucketsRequest.setExcludeInterim(true);
getBucketsRequest.setSort(Result.TIMESTAMP.getPreferredName());
getBucketsRequest.setDescending(true);
getBucketsRequest.setPageParams(new PageParams(0, 1));
return getBuckets(getBucketsRequest).get(0);
}

private <T> void blockingCall(Consumer<ActionListener<T>> function,
AtomicReference<T> response,
AtomicReference<Exception> error) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
ActionListener<T> listener = ActionListener.wrap(
r -> {
response.set(r);
latch.countDown();
},
e -> {
error.set(e);
latch.countDown();
}
);

function.accept(listener);
latch.await();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@
import org.elasticsearch.xpack.ml.rest.results.RestGetRecordsAction;
import org.elasticsearch.xpack.ml.rest.validate.RestValidateDetectorAction;
import org.elasticsearch.xpack.ml.rest.validate.RestValidateJobConfigAction;
import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService;

import java.io.IOException;
import java.math.BigInteger;
Expand Down Expand Up @@ -446,7 +447,8 @@ public List<Setting<?>> getSettings() {
MlConfigMigrationEligibilityCheck.ENABLE_CONFIG_MIGRATION,
InferenceProcessor.MAX_INFERENCE_PROCESSORS,
ModelLoadingService.INFERENCE_MODEL_CACHE_SIZE,
ModelLoadingService.INFERENCE_MODEL_CACHE_TTL
ModelLoadingService.INFERENCE_MODEL_CACHE_TTL,
ResultsPersisterService.PERSIST_RESULTS_MAX_RETRIES
);
}

Expand Down Expand Up @@ -520,9 +522,12 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
DataFrameAnalyticsAuditor dataFrameAnalyticsAuditor = new DataFrameAnalyticsAuditor(client, clusterService.getNodeName());
InferenceAuditor inferenceAuditor = new InferenceAuditor(client, clusterService.getNodeName());
this.dataFrameAnalyticsAuditor.set(dataFrameAnalyticsAuditor);
ResultsPersisterService resultsPersisterService = new ResultsPersisterService(client, clusterService, settings);
JobResultsProvider jobResultsProvider = new JobResultsProvider(client, settings);
JobResultsPersister jobResultsPersister = new JobResultsPersister(client);
JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(client);
JobResultsPersister jobResultsPersister = new JobResultsPersister(client, resultsPersisterService, anomalyDetectionAuditor);
JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(client,
resultsPersisterService,
anomalyDetectionAuditor);
JobConfigProvider jobConfigProvider = new JobConfigProvider(client, xContentRegistry);
DatafeedConfigProvider datafeedConfigProvider = new DatafeedConfigProvider(client, xContentRegistry);
UpdateJobProcessNotifier notifier = new UpdateJobProcessNotifier(client, clusterService, threadPool);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ private ActionListener<RevertModelSnapshotAction.Response> wrapRevertDataCountsL
return ActionListener.wrap(response -> {
jobResultsProvider.dataCounts(jobId, counts -> {
counts.setLatestRecordTimeStamp(modelSnapshot.getLatestRecordTimeStamp());
jobDataCountsPersister.persistDataCounts(jobId, counts, new ActionListener<Boolean>() {
jobDataCountsPersister.persistDataCountsAsync(jobId, counts, new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean aBoolean) {
listener.onResponse(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,19 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService;

import java.io.IOException;

Expand All @@ -33,30 +36,60 @@ public class JobDataCountsPersister {

private static final Logger logger = LogManager.getLogger(JobDataCountsPersister.class);

private final ResultsPersisterService resultsPersisterService;
private final Client client;
private final AnomalyDetectionAuditor auditor;

public JobDataCountsPersister(Client client) {
public JobDataCountsPersister(Client client, ResultsPersisterService resultsPersisterService, AnomalyDetectionAuditor auditor) {
this.resultsPersisterService = resultsPersisterService;
this.client = client;
this.auditor = auditor;
}

private XContentBuilder serialiseCounts(DataCounts counts) throws IOException {
private static XContentBuilder serialiseCounts(DataCounts counts) throws IOException {
XContentBuilder builder = jsonBuilder();
return counts.toXContent(builder, ToXContent.EMPTY_PARAMS);
}

/**
* Update the job's data counts stats and figures.
* NOTE: This call is synchronous and pauses the calling thread.
* @param jobId Job to update
* @param counts The counts
*/
public void persistDataCounts(String jobId, DataCounts counts) {
try {
resultsPersisterService.indexWithRetry(jobId,
AnomalyDetectorsIndex.resultsWriteAlias(jobId),
counts,
ToXContent.EMPTY_PARAMS,
WriteRequest.RefreshPolicy.NONE,
DataCounts.documentId(jobId),
() -> true,
(msg) -> auditor.warning(jobId, "Job data_counts " + msg));
} catch (IOException ioe) {
logger.error(() -> new ParameterizedMessage("[{}] Failed writing data_counts stats", jobId), ioe);
} catch (Exception ex) {
logger.error(() -> new ParameterizedMessage("[{}] Failed persisting data_counts stats", jobId), ex);
}
}

/**
* The same as {@link JobDataCountsPersister#persistDataCounts(String, DataCounts)} but done Asynchronously.
*
* Two differences are:
* - The listener is notified on persistence failure
* - If the persistence fails, it is not automatically retried
* @param jobId Job to update
* @param counts The counts
* @param listener ActionType response listener
*/
public void persistDataCounts(String jobId, DataCounts counts, ActionListener<Boolean> listener) {
public void persistDataCountsAsync(String jobId, DataCounts counts, ActionListener<Boolean> listener) {
try (XContentBuilder content = serialiseCounts(counts)) {
przemekwitek marked this conversation as resolved.
Show resolved Hide resolved
final IndexRequest request = new IndexRequest(AnomalyDetectorsIndex.resultsWriteAlias(jobId))
.id(DataCounts.documentId(jobId))
.source(content);
executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, request, new ActionListener<IndexResponse>() {
.id(DataCounts.documentId(jobId))
przemekwitek marked this conversation as resolved.
Show resolved Hide resolved
.source(content);
executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, request, new ActionListener<>() {
@Override
public void onResponse(IndexResponse indexResponse) {
listener.onResponse(true);
Expand All @@ -68,7 +101,9 @@ public void onFailure(Exception e) {
}
});
} catch (IOException ioe) {
logger.warn((Supplier<?>)() -> new ParameterizedMessage("[{}] Error serialising DataCounts stats", jobId), ioe);
String msg = new ParameterizedMessage("[{}] Failed writing data_counts stats", jobId).getFormattedMessage();
logger.error(msg, ioe);
listener.onFailure(ExceptionsHelper.serverError(msg, ioe));
}
}
}
Loading