diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/BulkFailureRetryIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/BulkFailureRetryIT.java new file mode 100644 index 0000000000000..2a771130cac70 --- /dev/null +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/BulkFailureRetryIT.java @@ -0,0 +1,214 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.integration; + +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.xpack.core.action.util.PageParams; +import org.elasticsearch.xpack.core.ml.action.GetBucketsAction; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.core.ml.job.config.DataDescription; +import org.elasticsearch.xpack.core.ml.job.config.Detector; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.results.Bucket; +import org.elasticsearch.xpack.core.ml.job.results.Result; +import org.junit.After; +import org.junit.Before; + +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeedBuilder; +import static org.hamcrest.Matchers.greaterThan; + +public class BulkFailureRetryIT extends MlNativeAutodetectIntegTestCase { + + private String index = "bulk-failure-retry"; + private long now = System.currentTimeMillis(); + private String jobId = "bulk-failure-retry-job"; + private String resultsIndex = ".ml-anomalies-custom-bulk-failure-retry-job"; + + @Before + public void putPastDataIntoIndex() { + client().admin().indices().prepareCreate(index) + .addMapping("type", "time", "type=date", "value", "type=long") + .get(); + long oneDayAgo = now - 86400000; + long twoDaysAgo = oneDayAgo - 86400000; + writeData(logger, index, 128, twoDaysAgo, oneDayAgo); + } + + @After + public void cleanUpTest() { + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder() + .putNull("logger.org.elasticsearch.xpack.ml") + .putNull("xpack.ml.persist_results_max_retries") + .build()).get(); + cleanUp(); + } + + private void ensureAnomaliesWrite() throws InterruptedException { + Settings settings = Settings.builder().put(IndexMetaData.INDEX_READ_ONLY_SETTING.getKey(), false).build(); + AtomicReference acknowledgedResponseHolder = new AtomicReference<>(); + AtomicReference exceptionHolder = new AtomicReference<>(); + blockingCall( + listener -> client().admin().indices().prepareUpdateSettings(resultsIndex).setSettings(settings).execute(listener), + acknowledgedResponseHolder, + exceptionHolder); + if (exceptionHolder.get() != null) { + logger.error("FAILED TO MARK ["+ resultsIndex + "] as read-write again", exceptionHolder.get()); + } + } + + private void setAnomaliesReadOnlyBlock() throws InterruptedException { + Settings settings = Settings.builder().put(IndexMetaData.INDEX_READ_ONLY_SETTING.getKey(), true).build(); + AtomicReference acknowledgedResponseHolder = new AtomicReference<>(); + AtomicReference exceptionHolder = new AtomicReference<>(); + blockingCall( + listener -> client().admin().indices().prepareUpdateSettings(resultsIndex).setSettings(settings).execute(listener), + acknowledgedResponseHolder, + exceptionHolder); + if (exceptionHolder.get() != null) { + logger.error("FAILED TO MARK ["+ resultsIndex + "] as read-write again", exceptionHolder.get()); + } + } + + public void testBulkFailureRetries() throws Exception { + Job.Builder job = createJob(jobId, TimeValue.timeValueMinutes(5), "count", null); + job.setResultsIndexName(jobId); + + DatafeedConfig.Builder datafeedConfigBuilder = + createDatafeedBuilder(job.getId() + "-datafeed", job.getId(), Collections.singletonList(index)); + DatafeedConfig datafeedConfig = datafeedConfigBuilder.build(); + registerJob(job); + putJob(job); + openJob(job.getId()); + registerDatafeed(datafeedConfig); + putDatafeed(datafeedConfig); + startDatafeed(datafeedConfig.getId(), 0L, now - 86400000); + waitUntilJobIsClosed(jobId); + + // Get the job stats + Bucket initialLatestBucket = getLatestFinalizedBucket(jobId); + assertThat(initialLatestBucket.getEpoch(), greaterThan(0L)); + + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder() + .put("logger.org.elasticsearch.xpack.ml", "TRACE") + .put("xpack.ml.persist_results_max_retries", "10000") + .build()).get(); + + setAnomaliesReadOnlyBlock(); + + int moreDocs = 128; + long oneDayAgo = now - 86400000; + writeData(logger, index, moreDocs, oneDayAgo, now); + + openJob(job.getId()); + startDatafeed(datafeedConfig.getId(), oneDayAgo, now); + + // TODO Any better way????? + Thread.sleep(1000); + ensureAnomaliesWrite(); + waitUntilJobIsClosed(jobId); + + Bucket newLatestBucket = getLatestFinalizedBucket(jobId); + assertThat(newLatestBucket.getEpoch(), greaterThan(initialLatestBucket.getEpoch())); + } + + private Job.Builder createJob(String id, TimeValue bucketSpan, String function, String field) { + return createJob(id, bucketSpan, function, field, null); + } + + private Job.Builder createJob(String id, TimeValue bucketSpan, String function, String field, String summaryCountField) { + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setFormat(DataDescription.DataFormat.XCONTENT); + dataDescription.setTimeField("time"); + dataDescription.setTimeFormat(DataDescription.EPOCH_MS); + + Detector.Builder d = new Detector.Builder(function, field); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build())); + analysisConfig.setBucketSpan(bucketSpan); + analysisConfig.setSummaryCountFieldName(summaryCountField); + + Job.Builder builder = new Job.Builder(); + builder.setId(id); + builder.setAnalysisConfig(analysisConfig); + builder.setDataDescription(dataDescription); + return builder; + } + + private void writeData(Logger logger, String index, long numDocs, long start, long end) { + int maxDelta = (int) (end - start - 1); + BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); + for (int i = 0; i < numDocs; i++) { + IndexRequest indexRequest = new IndexRequest(index); + long timestamp = start + randomIntBetween(0, maxDelta); + assert timestamp >= start && timestamp < end; + indexRequest.source("time", timestamp, "value", i); + bulkRequestBuilder.add(indexRequest); + } + BulkResponse bulkResponse = bulkRequestBuilder + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(); + if (bulkResponse.hasFailures()) { + int failures = 0; + for (BulkItemResponse itemResponse : bulkResponse) { + if (itemResponse.isFailed()) { + failures++; + logger.error("Item response failure [{}]", itemResponse.getFailureMessage()); + } + } + fail("Bulk response contained " + failures + " failures"); + } + logger.info("Indexed [{}] documents", numDocs); + } + + private Bucket getLatestFinalizedBucket(String jobId) { + GetBucketsAction.Request getBucketsRequest = new GetBucketsAction.Request(jobId); + getBucketsRequest.setExcludeInterim(true); + getBucketsRequest.setSort(Result.TIMESTAMP.getPreferredName()); + getBucketsRequest.setDescending(true); + getBucketsRequest.setPageParams(new PageParams(0, 1)); + return getBuckets(getBucketsRequest).get(0); + } + + private void blockingCall(Consumer> function, + AtomicReference response, + AtomicReference error) throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + ActionListener listener = ActionListener.wrap( + r -> { + response.set(r); + latch.countDown(); + }, + e -> { + error.set(e); + latch.countDown(); + } + ); + + function.accept(listener); + latch.await(); + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 76442812ef256..4a4fd458b15a2 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -228,6 +228,7 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.job.process.autodetect.BlackHoleAutodetectProcess; import org.elasticsearch.xpack.ml.job.process.autodetect.NativeAutodetectProcessFactory; +import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultProcessor; import org.elasticsearch.xpack.ml.job.process.normalizer.MultiplyingNormalizerProcess; import org.elasticsearch.xpack.ml.job.process.normalizer.NativeNormalizerProcessFactory; import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory; @@ -446,7 +447,8 @@ public List> getSettings() { MlConfigMigrationEligibilityCheck.ENABLE_CONFIG_MIGRATION, InferenceProcessor.MAX_INFERENCE_PROCESSORS, ModelLoadingService.INFERENCE_MODEL_CACHE_SIZE, - ModelLoadingService.INFERENCE_MODEL_CACHE_TTL + ModelLoadingService.INFERENCE_MODEL_CACHE_TTL, + AutodetectResultProcessor.PERSIST_RESULTS_MAX_RETRIES ); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java index 783706259a17b..d1f717d488f06 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java @@ -97,7 +97,7 @@ private Builder(String jobId) { * @param bucket The bucket to persist * @return this */ - public Builder persistBucket(Bucket bucket) { + public Builder persistBucket(Bucket bucket) throws BulkIndexException { // If the supplied bucket has records then create a copy with records // removed, because we never persist nested records in buckets Bucket bucketWithoutRecords = bucket; @@ -114,7 +114,7 @@ public Builder persistBucket(Bucket bucket) { return this; } - private void persistBucketInfluencersStandalone(String jobId, List bucketInfluencers) { + private void persistBucketInfluencersStandalone(String jobId, List bucketInfluencers) throws BulkIndexException { if (bucketInfluencers != null && bucketInfluencers.isEmpty() == false) { for (BucketInfluencer bucketInfluencer : bucketInfluencers) { String id = bucketInfluencer.getId(); @@ -130,7 +130,7 @@ private void persistBucketInfluencersStandalone(String jobId, List records) { + public Builder persistRecords(List records) throws BulkIndexException { for (AnomalyRecord record : records) { logger.trace("[{}] ES BULK ACTION: index record to index [{}] with ID [{}]", jobId, indexName, record.getId()); indexResult(record.getId(), record, "record"); @@ -161,7 +161,7 @@ public Builder persistRecords(List records) { * @param influencers the influencers to persist * @return this */ - public Builder persistInfluencers(List influencers) { + public Builder persistInfluencers(List influencers) throws BulkIndexException { for (Influencer influencer : influencers) { logger.trace("[{}] ES BULK ACTION: index influencer to index [{}] with ID [{}]", jobId, indexName, influencer.getId()); indexResult(influencer.getId(), influencer, "influencer"); @@ -170,30 +170,30 @@ public Builder persistInfluencers(List influencers) { return this; } - public Builder persistModelPlot(ModelPlot modelPlot) { + public Builder persistModelPlot(ModelPlot modelPlot) throws BulkIndexException { logger.trace("[{}] ES BULK ACTION: index model plot to index [{}] with ID [{}]", jobId, indexName, modelPlot.getId()); indexResult(modelPlot.getId(), modelPlot, "model plot"); return this; } - public Builder persistForecast(Forecast forecast) { + public Builder persistForecast(Forecast forecast) throws BulkIndexException { logger.trace("[{}] ES BULK ACTION: index forecast to index [{}] with ID [{}]", jobId, indexName, forecast.getId()); indexResult(forecast.getId(), forecast, Forecast.RESULT_TYPE_VALUE); return this; } - public Builder persistForecastRequestStats(ForecastRequestStats forecastRequestStats) { + public Builder persistForecastRequestStats(ForecastRequestStats forecastRequestStats) throws BulkIndexException { logger.trace("[{}] ES BULK ACTION: index forecast request stats to index [{}] with ID [{}]", jobId, indexName, forecastRequestStats.getId()); indexResult(forecastRequestStats.getId(), forecastRequestStats, Forecast.RESULT_TYPE_VALUE); return this; } - private void indexResult(String id, ToXContent resultDoc, String resultType) { + private void indexResult(String id, ToXContent resultDoc, String resultType) throws BulkIndexException { indexResult(id, resultDoc, ToXContent.EMPTY_PARAMS, resultType); } - private void indexResult(String id, ToXContent resultDoc, ToXContent.Params params, String resultType) { + private void indexResult(String id, ToXContent resultDoc, ToXContent.Params params, String resultType) throws BulkIndexException { try (XContentBuilder content = toXContentBuilder(resultDoc, params)) { bulkRequest.add(new IndexRequest(indexName).id(id).source(content)); } catch (IOException e) { @@ -208,7 +208,7 @@ private void indexResult(String id, ToXContent resultDoc, ToXContent.Params para /** * Execute the bulk action */ - public void executeRequest() { + public void executeRequest() throws BulkIndexException { if (bulkRequest.numberOfActions() == 0) { return; } @@ -218,6 +218,7 @@ public void executeRequest() { BulkResponse addRecordsResponse = client.bulk(bulkRequest).actionGet(); if (addRecordsResponse.hasFailures()) { logger.error("[{}] Bulk index of results has errors: {}", jobId, addRecordsResponse.buildFailureMessage()); + throw new BulkIndexException(addRecordsResponse); } } @@ -411,4 +412,16 @@ private void logCall(String indexName) { } } } + + public static class BulkIndexException extends Exception { + + public BulkIndexException(String msg) { + super(msg); + } + + public BulkIndexException(BulkResponse bulkResponse) { + this(bulkResponse.buildFailureMessage()); + } + + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/TimingStatsReporter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/TimingStatsReporter.java index 69d6936e9051f..8ac184db61b4b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/TimingStatsReporter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/TimingStatsReporter.java @@ -36,7 +36,7 @@ public TimingStats getCurrentTimingStats() { return new TimingStats(currentTimingStats); } - public void reportBucket(Bucket bucket) { + public void reportBucket(Bucket bucket) throws JobResultsPersister.BulkIndexException { currentTimingStats.updateStats(bucket.getProcessingTimeMs()); currentTimingStats.setLatestRecordTimestamp(bucket.getTimestamp().toInstant().plusSeconds(bucket.getBucketSpan())); if (differSignificantly(currentTimingStats, persistedTimingStats)) { @@ -44,7 +44,7 @@ public void reportBucket(Bucket bucket) { } } - public void finishReporting() { + public void finishReporting() throws JobResultsPersister.BulkIndexException { // Don't flush if current timing stats are identical to the persisted ones if (currentTimingStats.equals(persistedTimingStats)) { return; @@ -52,7 +52,7 @@ public void finishReporting() { flush(); } - private void flush() { + private void flush() throws JobResultsPersister.BulkIndexException { persistedTimingStats = new TimingStats(currentTimingStats); bulkResultsPersister.persistTimingStats(persistedTimingStats); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java index dff118011b147..b00ae2816068f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java @@ -7,6 +7,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; @@ -235,7 +236,15 @@ public void finishReporting(ActionListener listener) { totalRecordStats.setLastDataTimeStamp(now); diagnostics.flush(); retrieveDiagnosticsIntermediateResults(); - dataCountsPersister.persistDataCounts(job.getId(), runningTotalStats(), listener); + dataCountsPersister.persistDataCounts(job.getId(), runningTotalStats(), ActionListener.wrap( + listener::onResponse, + e -> { + // Recording data counts should not cause the job processing to fail. + // Log the failure and move on. + logger.warn(() -> new ParameterizedMessage("[{}] failed to record data counts", job.getId()), e); + listener.onResponse(true); + } + )); } /** diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 9ef733198114e..27a79f4aa2ab1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -112,6 +112,7 @@ public class AutodetectProcessManager implements ClusterStateListener { private final AnomalyDetectionAuditor auditor; private volatile boolean upgradeInProgress; + private volatile int maximumBulkFailureRetries; public AutodetectProcessManager(Environment environment, Settings settings, Client client, ThreadPool threadPool, NamedXContentRegistry xContentRegistry, AnomalyDetectionAuditor auditor, ClusterService clusterService, @@ -131,15 +132,22 @@ public AutodetectProcessManager(Environment environment, Settings settings, Clie this.jobDataCountsPersister = jobDataCountsPersister; this.auditor = auditor; this.nativeStorageProvider = Objects.requireNonNull(nativeStorageProvider); + this.maximumBulkFailureRetries = AutodetectResultProcessor.PERSIST_RESULTS_MAX_RETRIES.get(settings); clusterService.addListener(this); clusterService.getClusterSettings() .addSettingsUpdateConsumer(MachineLearning.MAX_OPEN_JOBS_PER_NODE, this::setMaxAllowedRunningJobs); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(AutodetectResultProcessor.PERSIST_RESULTS_MAX_RETRIES, this::setMaximumBulkFailureRetries); } void setMaxAllowedRunningJobs(int maxAllowedRunningJobs) { this.maxAllowedRunningJobs = maxAllowedRunningJobs; } + void setMaximumBulkFailureRetries(int maximumBulkFailureRetries) { + this.maximumBulkFailureRetries = maximumBulkFailureRetries; + } + public synchronized void closeAllJobsOnThisNode(String reason) { int numJobs = processByAllocation.size(); if (numJobs != 0) { @@ -519,7 +527,8 @@ AutodetectCommunicator create(JobTask jobTask, Job job, AutodetectParams autodet jobResultsPersister, process, autodetectParams.modelSizeStats(), - autodetectParams.timingStats()); + autodetectParams.timingStats(), + maximumBulkFailureRetries); ExecutorService autodetectWorkerExecutor; try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) { autodetectWorkerExecutor = createAutodetectExecutorService(autodetectExecutorService); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java index c9441e9f60c39..610acebfa9604 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java @@ -8,12 +8,14 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.xpack.core.ml.MachineLearningField; @@ -71,6 +73,14 @@ */ public class AutodetectResultProcessor { + public static final Setting PERSIST_RESULTS_MAX_RETRIES = Setting.intSetting( + "xpack.ml.persist_results_max_retries", + 2, + 0, + Integer.MAX_VALUE - 2, + Setting.Property.Dynamic, + Setting.Property.NodeScope); + private static final Logger LOGGER = LogManager.getLogger(AutodetectResultProcessor.class); private final Client client; @@ -86,6 +96,7 @@ public class AutodetectResultProcessor { private final FlushListener flushListener; private volatile boolean processKilled; private volatile boolean failed; + private final int maximumFailureRetries; private int bucketCount; // only used from the process() thread, so doesn't need to be volatile private final JobResultsPersister.Builder bulkResultsPersister; private boolean deleteInterimRequired; @@ -102,15 +113,16 @@ public AutodetectResultProcessor(Client client, JobResultsPersister persister, AutodetectProcess process, ModelSizeStats latestModelSizeStats, - TimingStats timingStats) { - this(client, auditor, jobId, renormalizer, persister, process, latestModelSizeStats, timingStats, new FlushListener()); + TimingStats timingStats, + int maximumFailureRetries) { + this(client, auditor, jobId, renormalizer, persister, process, latestModelSizeStats, timingStats, new FlushListener(), + maximumFailureRetries); } // Visible for testing AutodetectResultProcessor(Client client, AnomalyDetectionAuditor auditor, String jobId, Renormalizer renormalizer, JobResultsPersister persister, AutodetectProcess autodetectProcess, ModelSizeStats latestModelSizeStats, - TimingStats timingStats, - FlushListener flushListener) { + TimingStats timingStats, FlushListener flushListener, int maximumFailureRetries) { this.client = Objects.requireNonNull(client); this.auditor = Objects.requireNonNull(auditor); this.jobId = Objects.requireNonNull(jobId); @@ -122,6 +134,7 @@ public AutodetectResultProcessor(Client client, this.bulkResultsPersister = persister.bulkPersisterBuilder(jobId); this.timingStatsReporter = new TimingStatsReporter(timingStats, bulkResultsPersister); this.deleteInterimRequired = true; + this.maximumFailureRetries = maximumFailureRetries; } public void process() { @@ -137,6 +150,8 @@ public void process() { timingStatsReporter.finishReporting(); bulkResultsPersister.executeRequest(); } + } catch (JobResultsPersister.BulkIndexException e){ + bulkPersistWithRetry(); } catch (Exception e) { LOGGER.warn(new ParameterizedMessage("[{}] Error persisting autodetect results", jobId), e); } @@ -176,11 +191,15 @@ private void readResults() { if (result.getBucket() != null) { LOGGER.trace("[{}] Bucket number {} parsed from output", jobId, bucketCount); } - } catch (Exception e) { - if (processKilled) { - throw e; + } catch (JobResultsPersister.BulkIndexException e) { + // Don't throw on bulk failures, just continue + if (isDeadOrDying()) { + continue; } - if (process.isProcessAliveAfterWaiting() == false) { + // attempt to retry if possible + bulkPersistWithRetry(); + } catch (Exception e) { + if (isDeadOrDying()) { throw e; } LOGGER.warn(new ParameterizedMessage("[{}] Error processing autodetect result", jobId), e); @@ -196,7 +215,7 @@ public void setProcessKilled() { renormalizer.shutdown(); } - void processResult(AutodetectResult result) { + void processResult(AutodetectResult result) throws JobResultsPersister.BulkIndexException { if (processKilled) { return; } @@ -310,6 +329,45 @@ void processResult(AutodetectResult result) { } } + void bulkPersistWithRetry() { + int attempts = 0; + while(attempts < maximumFailureRetries) { + try { + bulkResultsPersister.executeRequest(); + return; + } catch (JobResultsPersister.BulkIndexException ex) { + if (isDeadOrDying()) { + return; + } + final int currentAttempt = attempts; + LOGGER.trace( + () -> new ParameterizedMessage("[{}] bulk persist failure on attempt [{}] ", jobId, currentAttempt), + ex + ); + attempts++; + try { + double backOff = ((1 << attempts) - 1) / 2.0; + Thread.sleep((int)(backOff * 100)); + } catch (InterruptedException interrupt) { + LOGGER.warn( + () -> new ParameterizedMessage("[{}] failed bulk indexing of results after [{}] attempts", jobId, currentAttempt), + ex + ); + return; + } + } catch (Exception e) { + if (isDeadOrDying()) { + throw e; + } + LOGGER.warn(new ParameterizedMessage("[{}] Error processing autodetect result", jobId), e); + } + } + LOGGER.warn( + new ParameterizedMessage("[{}] failed bulk indexing of results after [{}] attempts", + jobId, + attempts)); + } + private void processModelSizeStats(ModelSizeStats modelSizeStats) { LOGGER.trace("[{}] Parsed ModelSizeStats: {} / {} / {} / {} / {} / {}", jobId, modelSizeStats.getModelBytes(), modelSizeStats.getTotalByFieldCount(), @@ -435,6 +493,10 @@ boolean isDeleteInterimRequired() { return deleteInterimRequired; } + private boolean isDeadOrDying() { + return processKilled || (process.isProcessAliveAfterWaiting() == false); + } + void setDeleteInterimRequired(boolean deleteInterimRequired) { this.deleteInterimRequired = deleteInterimRequired; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/JsonDataToProcessWriter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/JsonDataToProcessWriter.java index 92fe2c3b0b50a..902fbb775f474 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/JsonDataToProcessWriter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/JsonDataToProcessWriter.java @@ -53,7 +53,7 @@ class JsonDataToProcessWriter extends AbstractDataToProcessWriter { * the OutputStream. No transformation is applied to the data the timestamp * is expected in seconds from the epoch. If any of the fields in * analysisFields or the DataDescriptions - * timeField is missing from the JOSN inputIndex an exception is thrown + * timeField is missing from the JSON inputIndex an exception is thrown */ @Override public void write(InputStream inputStream, CategorizationAnalyzer categorizationAnalyzer, XContentType xContentType, diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java index f6ec2fc9b89f1..703c1bb78916c 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java @@ -100,7 +100,8 @@ public void createComponents() throws Exception { new JobResultsPersister(client()), process, new ModelSizeStats.Builder(JOB_ID).build(), - new TimingStats(JOB_ID)) { + new TimingStats(JOB_ID), + 2) { @Override protected void updateModelSnapshotOnJob(ModelSnapshot modelSnapshot) { capturedUpdateModelSnapshotOnJobRequests.add(modelSnapshot); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/EstablishedMemUsageIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/EstablishedMemUsageIT.java index a026d5d6c337b..d16164681adc3 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/EstablishedMemUsageIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/EstablishedMemUsageIT.java @@ -221,7 +221,7 @@ private void initClusterAndJob(String jobId) { client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet(); } - private void createBuckets(String jobId, int count) { + private void createBuckets(String jobId, int count) throws Exception { JobResultsPersister.Builder builder = jobResultsPersister.bulkPersisterBuilder(jobId); for (int i = 1; i <= count; ++i) { Bucket bucket = new Bucket(jobId, new Date(bucketSpan * i), bucketSpan); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java index 09e5cf9ce6331..913f297b9c1fe 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java @@ -29,7 +29,6 @@ import org.elasticsearch.xpack.core.ml.utils.ExponentialAverageCalculationContext; import org.mockito.ArgumentCaptor; -import java.io.IOException; import java.time.Instant; import java.util.ArrayList; import java.util.Collections; @@ -51,7 +50,7 @@ public class JobResultsPersisterTests extends ESTestCase { private static final String JOB_ID = "foo"; - public void testPersistBucket_OneRecord() throws IOException { + public void testPersistBucket_OneRecord() throws Exception { ArgumentCaptor captor = ArgumentCaptor.forClass(BulkRequest.class); Client client = mockClient(captor); Bucket bucket = new Bucket("foo", new Date(), 123456); @@ -94,7 +93,7 @@ public void testPersistBucket_OneRecord() throws IOException { assertTrue(s.matches(".*raw_anomaly_score.:19\\.19.*")); } - public void testPersistRecords() throws IOException { + public void testPersistRecords() throws Exception { ArgumentCaptor captor = ArgumentCaptor.forClass(BulkRequest.class); Client client = mockClient(captor); @@ -149,7 +148,7 @@ public void testPersistRecords() throws IOException { assertTrue(s.matches(".*over_field_value.:.overValue.*")); } - public void testPersistInfluencers() throws IOException { + public void testPersistInfluencers() throws Exception { ArgumentCaptor captor = ArgumentCaptor.forClass(BulkRequest.class); Client client = mockClient(captor); @@ -173,7 +172,7 @@ public void testPersistInfluencers() throws IOException { assertTrue(s.matches(".*influencer_score.:16\\.0.*")); } - public void testExecuteRequest_ClearsBulkRequest() { + public void testExecuteRequest_ClearsBulkRequest() throws Exception { ArgumentCaptor captor = ArgumentCaptor.forClass(BulkRequest.class); Client client = mockClient(captor); JobResultsPersister persister = new JobResultsPersister(client); @@ -190,7 +189,7 @@ public void testExecuteRequest_ClearsBulkRequest() { assertEquals(0, builder.getBulkRequest().numberOfActions()); } - public void testBulkRequestExecutesWhenReachMaxDocs() { + public void testBulkRequestExecutesWhenReachMaxDocs() throws Exception { ArgumentCaptor captor = ArgumentCaptor.forClass(BulkRequest.class); Client client = mockClient(captor); JobResultsPersister persister = new JobResultsPersister(client); @@ -206,7 +205,7 @@ public void testBulkRequestExecutesWhenReachMaxDocs() { verifyNoMoreInteractions(client); } - public void testPersistTimingStats() { + public void testPersistTimingStats() throws Exception { ArgumentCaptor bulkRequestCaptor = ArgumentCaptor.forClass(BulkRequest.class); Client client = mockClient(bulkRequestCaptor); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/TimingStatsReporterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/TimingStatsReporterTests.java index d8a4d5aec24b2..124b0f7433d6f 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/TimingStatsReporterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/TimingStatsReporterTests.java @@ -46,7 +46,7 @@ public void testGetCurrentTimingStats() { verifyZeroInteractions(bulkResultsPersister); } - public void testReporting() { + public void testReporting() throws Exception { TimingStatsReporter reporter = createReporter(new TimingStats(JOB_ID)); assertThat(reporter.getCurrentTimingStats(), equalTo(new TimingStats(JOB_ID))); @@ -65,7 +65,7 @@ public void testReporting() { verifyNoMoreInteractions(bulkResultsPersister); } - public void testFinishReporting() { + public void testFinishReporting() throws Exception { TimingStatsReporter reporter = createReporter(new TimingStats(JOB_ID)); assertThat(reporter.getCurrentTimingStats(), equalTo(new TimingStats(JOB_ID))); @@ -87,14 +87,14 @@ public void testFinishReporting() { verifyNoMoreInteractions(bulkResultsPersister); } - public void testFinishReporting_NoChange() { + public void testFinishReporting_NoChange() throws Exception { TimingStatsReporter reporter = createReporter(new TimingStats(JOB_ID)); reporter.finishReporting(); verifyZeroInteractions(bulkResultsPersister); } - public void testFinishReporting_WithChange() { + public void testFinishReporting_WithChange() throws Exception { TimingStatsReporter reporter = createReporter(new TimingStats(JOB_ID)); reporter.reportBucket(createBucket(10)); reporter.finishReporting(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index 72bdf45a96c4a..be858570a56b2 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -47,6 +47,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; +import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; @@ -61,6 +62,7 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.HashSet; @@ -151,7 +153,9 @@ public void setup() throws Exception { auditor = mock(AnomalyDetectionAuditor.class); clusterService = mock(ClusterService.class); ClusterSettings clusterSettings = - new ClusterSettings(Settings.EMPTY, Collections.singleton(MachineLearning.MAX_OPEN_JOBS_PER_NODE)); + new ClusterSettings(Settings.EMPTY, + new HashSet<>(Arrays.asList(MachineLearning.MAX_OPEN_JOBS_PER_NODE, + AutodetectResultProcessor.PERSIST_RESULTS_MAX_RETRIES))); when(clusterService.getClusterSettings()).thenReturn(clusterSettings); MetaData metaData = mock(MetaData.class); SortedMap aliasOrIndexSortedMap = new TreeMap<>(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java index 63ca73444b540..aad2b557fe170 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java @@ -107,7 +107,8 @@ public void setUpMocks() { process, new ModelSizeStats.Builder(JOB_ID).setTimestamp(new Date(BUCKET_SPAN_MS)).build(), new TimingStats(JOB_ID), - flushListener); + flushListener, + 2); } @After @@ -130,7 +131,7 @@ public void testProcess() throws TimeoutException { verify(persister).commitStateWrites(JOB_ID); } - public void testProcessResult_bucket() { + public void testProcessResult_bucket() throws Exception { when(bulkBuilder.persistTimingStats(any(TimingStats.class))).thenReturn(bulkBuilder); when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder); AutodetectResult result = mock(AutodetectResult.class); @@ -147,7 +148,7 @@ public void testProcessResult_bucket() { verify(persister, never()).deleteInterimResults(JOB_ID); } - public void testProcessResult_bucket_deleteInterimRequired() { + public void testProcessResult_bucket_deleteInterimRequired() throws Exception { when(bulkBuilder.persistTimingStats(any(TimingStats.class))).thenReturn(bulkBuilder); when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder); AutodetectResult result = mock(AutodetectResult.class); @@ -164,7 +165,7 @@ public void testProcessResult_bucket_deleteInterimRequired() { verify(persister).deleteInterimResults(JOB_ID); } - public void testProcessResult_records() { + public void testProcessResult_records() throws Exception { AutodetectResult result = mock(AutodetectResult.class); List records = Arrays.asList( @@ -180,7 +181,7 @@ public void testProcessResult_records() { verify(persister).bulkPersisterBuilder(JOB_ID); } - public void testProcessResult_influencers() { + public void testProcessResult_influencers() throws Exception { AutodetectResult result = mock(AutodetectResult.class); List influencers = Arrays.asList( @@ -196,7 +197,7 @@ public void testProcessResult_influencers() { verify(persister).bulkPersisterBuilder(JOB_ID); } - public void testProcessResult_categoryDefinition() { + public void testProcessResult_categoryDefinition() throws Exception { AutodetectResult result = mock(AutodetectResult.class); CategoryDefinition categoryDefinition = mock(CategoryDefinition.class); when(result.getCategoryDefinition()).thenReturn(categoryDefinition); @@ -209,7 +210,7 @@ public void testProcessResult_categoryDefinition() { verify(persister).bulkPersisterBuilder(JOB_ID); } - public void testProcessResult_flushAcknowledgement() { + public void testProcessResult_flushAcknowledgement() throws Exception { AutodetectResult result = mock(AutodetectResult.class); FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class); when(flushAcknowledgement.getId()).thenReturn(JOB_ID); @@ -225,7 +226,7 @@ public void testProcessResult_flushAcknowledgement() { verify(bulkBuilder).executeRequest(); } - public void testProcessResult_flushAcknowledgementMustBeProcessedLast() { + public void testProcessResult_flushAcknowledgementMustBeProcessedLast() throws Exception { AutodetectResult result = mock(AutodetectResult.class); FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class); when(flushAcknowledgement.getId()).thenReturn(JOB_ID); @@ -245,7 +246,7 @@ public void testProcessResult_flushAcknowledgementMustBeProcessedLast() { inOrder.verify(flushListener).acknowledgeFlush(flushAcknowledgement, null); } - public void testProcessResult_modelPlot() { + public void testProcessResult_modelPlot() throws Exception { AutodetectResult result = mock(AutodetectResult.class); ModelPlot modelPlot = mock(ModelPlot.class); when(result.getModelPlot()).thenReturn(modelPlot); @@ -257,7 +258,7 @@ public void testProcessResult_modelPlot() { verify(bulkBuilder).persistModelPlot(modelPlot); } - public void testProcessResult_modelSizeStats() { + public void testProcessResult_modelSizeStats() throws Exception { AutodetectResult result = mock(AutodetectResult.class); ModelSizeStats modelSizeStats = mock(ModelSizeStats.class); when(result.getModelSizeStats()).thenReturn(modelSizeStats); @@ -270,7 +271,7 @@ public void testProcessResult_modelSizeStats() { verify(persister).persistModelSizeStats(modelSizeStats); } - public void testProcessResult_modelSizeStatsWithMemoryStatusChanges() { + public void testProcessResult_modelSizeStatsWithMemoryStatusChanges() throws Exception { TimeValue delay = TimeValue.timeValueSeconds(5); // Set up schedule delay time when(threadPool.schedule(any(Runnable.class), any(TimeValue.class), anyString())) @@ -310,7 +311,7 @@ public void testProcessResult_modelSizeStatsWithMemoryStatusChanges() { verify(auditor).error(JOB_ID, Messages.getMessage(Messages.JOB_AUDIT_MEMORY_STATUS_HARD_LIMIT, "512mb", "1kb")); } - public void testProcessResult_modelSnapshot() { + public void testProcessResult_modelSnapshot() throws Exception { AutodetectResult result = mock(AutodetectResult.class); ModelSnapshot modelSnapshot = new ModelSnapshot.Builder(JOB_ID) .setSnapshotId("a_snapshot_id") @@ -333,7 +334,7 @@ public void testProcessResult_modelSnapshot() { verify(client).execute(same(UpdateJobAction.INSTANCE), eq(expectedJobUpdateRequest), any()); } - public void testProcessResult_quantiles_givenRenormalizationIsEnabled() { + public void testProcessResult_quantiles_givenRenormalizationIsEnabled() throws Exception { AutodetectResult result = mock(AutodetectResult.class); Quantiles quantiles = mock(Quantiles.class); when(result.getQuantiles()).thenReturn(quantiles); @@ -350,7 +351,7 @@ public void testProcessResult_quantiles_givenRenormalizationIsEnabled() { verify(renormalizer).renormalize(quantiles); } - public void testProcessResult_quantiles_givenRenormalizationIsDisabled() { + public void testProcessResult_quantiles_givenRenormalizationIsDisabled() throws Exception { AutodetectResult result = mock(AutodetectResult.class); Quantiles quantiles = mock(Quantiles.class); when(result.getQuantiles()).thenReturn(quantiles); @@ -433,4 +434,107 @@ public void testKill() throws TimeoutException { verify(renormalizer).waitUntilIdle(); verify(flushListener).clear(); } + + public void testBulkPersistWithRetry() throws Exception { + JobResultsPersister persister = mock(JobResultsPersister.class); + JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); + when(process.isProcessAlive()).thenReturn(true); + when(process.isProcessAliveAfterWaiting()).thenReturn(true); + doThrow(new JobResultsPersister.BulkIndexException("boom")) + .when(bulkBuilder).executeRequest(); + when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); + AutodetectResultProcessor processorUnderTest = new AutodetectResultProcessor( + client, + auditor, + JOB_ID, + renormalizer, + persister, + process, + new ModelSizeStats.Builder(JOB_ID).setTimestamp(new Date(BUCKET_SPAN_MS)).build(), + new TimingStats(JOB_ID), + flushListener, + 2); + + processorUnderTest.bulkPersistWithRetry(); + + // Hax as memory persister needs to be verified before @After + verify(this.persister).bulkPersisterBuilder(JOB_ID); + verify(bulkBuilder, times(2)).executeRequest(); + } + + public void testBulkPersistWithRetry_withDeadProcess() throws Exception { + JobResultsPersister persister = mock(JobResultsPersister.class); + JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); + when(process.isProcessAliveAfterWaiting()).thenReturn(true); + doThrow(new JobResultsPersister.BulkIndexException("boom")) + .when(bulkBuilder).executeRequest(); + when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); + AutodetectResultProcessor processorUnderTest = new AutodetectResultProcessor( + client, + auditor, + JOB_ID, + renormalizer, + persister, + process, + new ModelSizeStats.Builder(JOB_ID).setTimestamp(new Date(BUCKET_SPAN_MS)).build(), + new TimingStats(JOB_ID), + flushListener, + 2); + + processorUnderTest.setProcessKilled(); + processorUnderTest.bulkPersistWithRetry(); + verify(bulkBuilder, times(1)).executeRequest(); + // Hax as memory persister needs to be verified before @After + verify(this.persister).bulkPersisterBuilder(JOB_ID); + verify(this.renormalizer).shutdown(); + } + + public void testBulkPersistWithRetry_withDyingProcess() throws Exception { + JobResultsPersister persister = mock(JobResultsPersister.class); + JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); + when(process.isProcessAliveAfterWaiting()).thenReturn(false); + doThrow(new JobResultsPersister.BulkIndexException("boom")) + .when(bulkBuilder).executeRequest(); + when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); + AutodetectResultProcessor processorUnderTest = new AutodetectResultProcessor( + client, + auditor, + JOB_ID, + renormalizer, + persister, + process, + new ModelSizeStats.Builder(JOB_ID).setTimestamp(new Date(BUCKET_SPAN_MS)).build(), + new TimingStats(JOB_ID), + flushListener, + 2); + + processorUnderTest.bulkPersistWithRetry(); + verify(bulkBuilder, times(1)).executeRequest(); + // Hax as memory persister needs to be verified before @After + verify(this.persister).bulkPersisterBuilder(JOB_ID); + } + + public void testBulkPersistWithRetry_withExceptionOtherThanBulkFailure() throws Exception { + JobResultsPersister persister = mock(JobResultsPersister.class); + JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); + when(process.isProcessAliveAfterWaiting()).thenReturn(false); + doThrow(new ElasticsearchException("boom")) + .when(bulkBuilder).executeRequest(); + when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); + AutodetectResultProcessor processorUnderTest = new AutodetectResultProcessor( + client, + auditor, + JOB_ID, + renormalizer, + persister, + process, + new ModelSizeStats.Builder(JOB_ID).setTimestamp(new Date(BUCKET_SPAN_MS)).build(), + new TimingStats(JOB_ID), + flushListener, + 2); + + expectThrows(ElasticsearchException.class, processorUnderTest::bulkPersistWithRetry); + // Hax as memory persister needs to be verified before @After + verify(this.persister).bulkPersisterBuilder(JOB_ID); + } }