Skip to content

Commit

Permalink
[ML] Add graceful retry for bulk index results failures
Browse files Browse the repository at this point in the history
  • Loading branch information
benwtrent committed Nov 22, 2019
1 parent 0390ec3 commit 19c67d6
Show file tree
Hide file tree
Showing 14 changed files with 472 additions and 55 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.integration;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.ml.action.GetBucketsAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Detector;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.core.ml.job.results.Result;
import org.junit.After;
import org.junit.Before;

import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeedBuilder;
import static org.hamcrest.Matchers.greaterThan;

public class BulkFailureRetryIT extends MlNativeAutodetectIntegTestCase {

private String index = "bulk-failure-retry";
private long now = System.currentTimeMillis();
private String jobId = "bulk-failure-retry-job";
private String resultsIndex = ".ml-anomalies-custom-bulk-failure-retry-job";

@Before
public void putPastDataIntoIndex() {
client().admin().indices().prepareCreate(index)
.addMapping("type", "time", "type=date", "value", "type=long")
.get();
long oneDayAgo = now - 86400000;
long twoDaysAgo = oneDayAgo - 86400000;
writeData(logger, index, 128, twoDaysAgo, oneDayAgo);
}

@After
public void cleanUpTest() {
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder()
.putNull("logger.org.elasticsearch.xpack.ml")
.putNull("xpack.ml.persist_results_max_retries")
.build()).get();
cleanUp();
}

private void ensureAnomaliesWrite() throws InterruptedException {
Settings settings = Settings.builder().put(IndexMetaData.INDEX_READ_ONLY_SETTING.getKey(), false).build();
AtomicReference<AcknowledgedResponse> acknowledgedResponseHolder = new AtomicReference<>();
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
blockingCall(
listener -> client().admin().indices().prepareUpdateSettings(resultsIndex).setSettings(settings).execute(listener),
acknowledgedResponseHolder,
exceptionHolder);
if (exceptionHolder.get() != null) {
logger.error("FAILED TO MARK ["+ resultsIndex + "] as read-write again", exceptionHolder.get());
}
}

private void setAnomaliesReadOnlyBlock() throws InterruptedException {
Settings settings = Settings.builder().put(IndexMetaData.INDEX_READ_ONLY_SETTING.getKey(), true).build();
AtomicReference<AcknowledgedResponse> acknowledgedResponseHolder = new AtomicReference<>();
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
blockingCall(
listener -> client().admin().indices().prepareUpdateSettings(resultsIndex).setSettings(settings).execute(listener),
acknowledgedResponseHolder,
exceptionHolder);
if (exceptionHolder.get() != null) {
logger.error("FAILED TO MARK ["+ resultsIndex + "] as read-write again", exceptionHolder.get());
}
}

public void testBulkFailureRetries() throws Exception {
Job.Builder job = createJob(jobId, TimeValue.timeValueMinutes(5), "count", null);
job.setResultsIndexName(jobId);

DatafeedConfig.Builder datafeedConfigBuilder =
createDatafeedBuilder(job.getId() + "-datafeed", job.getId(), Collections.singletonList(index));
DatafeedConfig datafeedConfig = datafeedConfigBuilder.build();
registerJob(job);
putJob(job);
openJob(job.getId());
registerDatafeed(datafeedConfig);
putDatafeed(datafeedConfig);
startDatafeed(datafeedConfig.getId(), 0L, now - 86400000);
waitUntilJobIsClosed(jobId);

// Get the job stats
Bucket initialLatestBucket = getLatestFinalizedBucket(jobId);
assertThat(initialLatestBucket.getEpoch(), greaterThan(0L));

client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder()
.put("logger.org.elasticsearch.xpack.ml", "TRACE")
.put("xpack.ml.persist_results_max_retries", "10000")
.build()).get();

setAnomaliesReadOnlyBlock();

int moreDocs = 128;
long oneDayAgo = now - 86400000;
writeData(logger, index, moreDocs, oneDayAgo, now);

openJob(job.getId());
startDatafeed(datafeedConfig.getId(), oneDayAgo, now);

// TODO Any better way?????
Thread.sleep(1000);
ensureAnomaliesWrite();
waitUntilJobIsClosed(jobId);

Bucket newLatestBucket = getLatestFinalizedBucket(jobId);
assertThat(newLatestBucket.getEpoch(), greaterThan(initialLatestBucket.getEpoch()));
}

private Job.Builder createJob(String id, TimeValue bucketSpan, String function, String field) {
return createJob(id, bucketSpan, function, field, null);
}

private Job.Builder createJob(String id, TimeValue bucketSpan, String function, String field, String summaryCountField) {
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setFormat(DataDescription.DataFormat.XCONTENT);
dataDescription.setTimeField("time");
dataDescription.setTimeFormat(DataDescription.EPOCH_MS);

Detector.Builder d = new Detector.Builder(function, field);
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build()));
analysisConfig.setBucketSpan(bucketSpan);
analysisConfig.setSummaryCountFieldName(summaryCountField);

Job.Builder builder = new Job.Builder();
builder.setId(id);
builder.setAnalysisConfig(analysisConfig);
builder.setDataDescription(dataDescription);
return builder;
}

private void writeData(Logger logger, String index, long numDocs, long start, long end) {
int maxDelta = (int) (end - start - 1);
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
for (int i = 0; i < numDocs; i++) {
IndexRequest indexRequest = new IndexRequest(index);
long timestamp = start + randomIntBetween(0, maxDelta);
assert timestamp >= start && timestamp < end;
indexRequest.source("time", timestamp, "value", i);
bulkRequestBuilder.add(indexRequest);
}
BulkResponse bulkResponse = bulkRequestBuilder
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
if (bulkResponse.hasFailures()) {
int failures = 0;
for (BulkItemResponse itemResponse : bulkResponse) {
if (itemResponse.isFailed()) {
failures++;
logger.error("Item response failure [{}]", itemResponse.getFailureMessage());
}
}
fail("Bulk response contained " + failures + " failures");
}
logger.info("Indexed [{}] documents", numDocs);
}

private Bucket getLatestFinalizedBucket(String jobId) {
GetBucketsAction.Request getBucketsRequest = new GetBucketsAction.Request(jobId);
getBucketsRequest.setExcludeInterim(true);
getBucketsRequest.setSort(Result.TIMESTAMP.getPreferredName());
getBucketsRequest.setDescending(true);
getBucketsRequest.setPageParams(new PageParams(0, 1));
return getBuckets(getBucketsRequest).get(0);
}

private <T> void blockingCall(Consumer<ActionListener<T>> function,
AtomicReference<T> response,
AtomicReference<Exception> error) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
ActionListener<T> listener = ActionListener.wrap(
r -> {
response.set(r);
latch.countDown();
},
e -> {
error.set(e);
latch.countDown();
}
);

function.accept(listener);
latch.await();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.job.process.autodetect.BlackHoleAutodetectProcess;
import org.elasticsearch.xpack.ml.job.process.autodetect.NativeAutodetectProcessFactory;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultProcessor;
import org.elasticsearch.xpack.ml.job.process.normalizer.MultiplyingNormalizerProcess;
import org.elasticsearch.xpack.ml.job.process.normalizer.NativeNormalizerProcessFactory;
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory;
Expand Down Expand Up @@ -446,7 +447,8 @@ public List<Setting<?>> getSettings() {
MlConfigMigrationEligibilityCheck.ENABLE_CONFIG_MIGRATION,
InferenceProcessor.MAX_INFERENCE_PROCESSORS,
ModelLoadingService.INFERENCE_MODEL_CACHE_SIZE,
ModelLoadingService.INFERENCE_MODEL_CACHE_TTL
ModelLoadingService.INFERENCE_MODEL_CACHE_TTL,
AutodetectResultProcessor.PERSIST_RESULTS_MAX_RETRIES
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private Builder(String jobId) {
* @param bucket The bucket to persist
* @return this
*/
public Builder persistBucket(Bucket bucket) {
public Builder persistBucket(Bucket bucket) throws BulkIndexException {
// If the supplied bucket has records then create a copy with records
// removed, because we never persist nested records in buckets
Bucket bucketWithoutRecords = bucket;
Expand All @@ -114,7 +114,7 @@ public Builder persistBucket(Bucket bucket) {
return this;
}

private void persistBucketInfluencersStandalone(String jobId, List<BucketInfluencer> bucketInfluencers) {
private void persistBucketInfluencersStandalone(String jobId, List<BucketInfluencer> bucketInfluencers) throws BulkIndexException {
if (bucketInfluencers != null && bucketInfluencers.isEmpty() == false) {
for (BucketInfluencer bucketInfluencer : bucketInfluencers) {
String id = bucketInfluencer.getId();
Expand All @@ -130,7 +130,7 @@ private void persistBucketInfluencersStandalone(String jobId, List<BucketInfluen
* @param timingStats timing stats to persist
* @return this
*/
public Builder persistTimingStats(TimingStats timingStats) {
public Builder persistTimingStats(TimingStats timingStats) throws BulkIndexException {
indexResult(
TimingStats.documentId(timingStats.getJobId()),
timingStats,
Expand All @@ -145,7 +145,7 @@ public Builder persistTimingStats(TimingStats timingStats) {
* @param records the records to persist
* @return this
*/
public Builder persistRecords(List<AnomalyRecord> records) {
public Builder persistRecords(List<AnomalyRecord> records) throws BulkIndexException {
for (AnomalyRecord record : records) {
logger.trace("[{}] ES BULK ACTION: index record to index [{}] with ID [{}]", jobId, indexName, record.getId());
indexResult(record.getId(), record, "record");
Expand All @@ -161,7 +161,7 @@ public Builder persistRecords(List<AnomalyRecord> records) {
* @param influencers the influencers to persist
* @return this
*/
public Builder persistInfluencers(List<Influencer> influencers) {
public Builder persistInfluencers(List<Influencer> influencers) throws BulkIndexException {
for (Influencer influencer : influencers) {
logger.trace("[{}] ES BULK ACTION: index influencer to index [{}] with ID [{}]", jobId, indexName, influencer.getId());
indexResult(influencer.getId(), influencer, "influencer");
Expand All @@ -170,30 +170,30 @@ public Builder persistInfluencers(List<Influencer> influencers) {
return this;
}

public Builder persistModelPlot(ModelPlot modelPlot) {
public Builder persistModelPlot(ModelPlot modelPlot) throws BulkIndexException {
logger.trace("[{}] ES BULK ACTION: index model plot to index [{}] with ID [{}]", jobId, indexName, modelPlot.getId());
indexResult(modelPlot.getId(), modelPlot, "model plot");
return this;
}

public Builder persistForecast(Forecast forecast) {
public Builder persistForecast(Forecast forecast) throws BulkIndexException {
logger.trace("[{}] ES BULK ACTION: index forecast to index [{}] with ID [{}]", jobId, indexName, forecast.getId());
indexResult(forecast.getId(), forecast, Forecast.RESULT_TYPE_VALUE);
return this;
}

public Builder persistForecastRequestStats(ForecastRequestStats forecastRequestStats) {
public Builder persistForecastRequestStats(ForecastRequestStats forecastRequestStats) throws BulkIndexException {
logger.trace("[{}] ES BULK ACTION: index forecast request stats to index [{}] with ID [{}]", jobId, indexName,
forecastRequestStats.getId());
indexResult(forecastRequestStats.getId(), forecastRequestStats, Forecast.RESULT_TYPE_VALUE);
return this;
}

private void indexResult(String id, ToXContent resultDoc, String resultType) {
private void indexResult(String id, ToXContent resultDoc, String resultType) throws BulkIndexException {
indexResult(id, resultDoc, ToXContent.EMPTY_PARAMS, resultType);
}

private void indexResult(String id, ToXContent resultDoc, ToXContent.Params params, String resultType) {
private void indexResult(String id, ToXContent resultDoc, ToXContent.Params params, String resultType) throws BulkIndexException {
try (XContentBuilder content = toXContentBuilder(resultDoc, params)) {
bulkRequest.add(new IndexRequest(indexName).id(id).source(content));
} catch (IOException e) {
Expand All @@ -208,7 +208,7 @@ private void indexResult(String id, ToXContent resultDoc, ToXContent.Params para
/**
* Execute the bulk action
*/
public void executeRequest() {
public void executeRequest() throws BulkIndexException {
if (bulkRequest.numberOfActions() == 0) {
return;
}
Expand All @@ -218,6 +218,7 @@ public void executeRequest() {
BulkResponse addRecordsResponse = client.bulk(bulkRequest).actionGet();
if (addRecordsResponse.hasFailures()) {
logger.error("[{}] Bulk index of results has errors: {}", jobId, addRecordsResponse.buildFailureMessage());
throw new BulkIndexException(addRecordsResponse);
}
}

Expand Down Expand Up @@ -411,4 +412,16 @@ private void logCall(String indexName) {
}
}
}

public static class BulkIndexException extends Exception {

public BulkIndexException(String msg) {
super(msg);
}

public BulkIndexException(BulkResponse bulkResponse) {
this(bulkResponse.buildFailureMessage());
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,23 +36,23 @@ public TimingStats getCurrentTimingStats() {
return new TimingStats(currentTimingStats);
}

public void reportBucket(Bucket bucket) {
public void reportBucket(Bucket bucket) throws JobResultsPersister.BulkIndexException {
currentTimingStats.updateStats(bucket.getProcessingTimeMs());
currentTimingStats.setLatestRecordTimestamp(bucket.getTimestamp().toInstant().plusSeconds(bucket.getBucketSpan()));
if (differSignificantly(currentTimingStats, persistedTimingStats)) {
flush();
}
}

public void finishReporting() {
public void finishReporting() throws JobResultsPersister.BulkIndexException {
// Don't flush if current timing stats are identical to the persisted ones
if (currentTimingStats.equals(persistedTimingStats)) {
return;
}
flush();
}

private void flush() {
private void flush() throws JobResultsPersister.BulkIndexException {
persistedTimingStats = new TimingStats(currentTimingStats);
bulkResultsPersister.persistTimingStats(persistedTimingStats);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
Expand Down Expand Up @@ -235,7 +236,15 @@ public void finishReporting(ActionListener<Boolean> listener) {
totalRecordStats.setLastDataTimeStamp(now);
diagnostics.flush();
retrieveDiagnosticsIntermediateResults();
dataCountsPersister.persistDataCounts(job.getId(), runningTotalStats(), listener);
dataCountsPersister.persistDataCounts(job.getId(), runningTotalStats(), ActionListener.wrap(
listener::onResponse,
e -> {
// Recording data counts should not cause the job processing to fail.
// Log the failure and move on.
logger.warn(() -> new ParameterizedMessage("[{}] failed to record data counts", job.getId()), e);
listener.onResponse(true);
}
));
}

/**
Expand Down
Loading

0 comments on commit 19c67d6

Please sign in to comment.