From 285c81ef6311e3a2db5dd1235ca9964d241fa083 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Fri, 22 Nov 2019 12:39:54 -0500 Subject: [PATCH] [ML] Stop timing stats failure propagation (#49495) --- .../xpack/ml/datafeed/DatafeedJob.java | 8 +------- .../ml/datafeed/DatafeedTimingStatsReporter.java | 13 ++++++++++++- .../xpack/ml/datafeed/DatafeedJobTests.java | 11 ----------- .../DatafeedTimingStatsReporterTests.java | 15 +++++++++++++++ 4 files changed, 28 insertions(+), 19 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java index 55064dd3e38d7..22ebff57a4ba8 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java @@ -117,13 +117,7 @@ public Integer getMaxEmptySearches() { } public void finishReportingTimingStats() { - try { - timingStatsReporter.finishReporting(); - } catch (Exception e) { - // We don't want the exception to propagate out of this method as it can leave the datafeed in the "stopping" state forever. - // Since persisting datafeed timing stats is not critical, we just log a warning here. - LOGGER.warn("[{}] Datafeed timing stats could not be reported due to: {}", jobId, e); - } + timingStatsReporter.finishReporting(); } Long runLookBack(long startTime, Long endTime) throws Exception { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporter.java index b11761541dfba..2878c48db3b70 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporter.java @@ -5,6 +5,9 @@ */ package org.elasticsearch.xpack.ml.datafeed; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats; @@ -20,6 +23,7 @@ */ public class DatafeedTimingStatsReporter { + private static final Logger LOGGER = LogManager.getLogger(DatafeedTimingStatsReporter.class); /** Interface used for persisting current timing stats to the results index. */ @FunctionalInterface public interface DatafeedTimingStatsPersister { @@ -96,7 +100,14 @@ private void flushIfDifferSignificantly() { private void flush(WriteRequest.RefreshPolicy refreshPolicy) { persistedTimingStats = new DatafeedTimingStats(currentTimingStats); if (allowedPersisting) { - persister.persistDatafeedTimingStats(persistedTimingStats, refreshPolicy); + try { + persister.persistDatafeedTimingStats(persistedTimingStats, refreshPolicy); + } catch (Exception ex) { + // Since persisting datafeed timing stats is not critical, we just log a warning here. + LOGGER.warn( + () -> new ParameterizedMessage("[{}] failed to report datafeed timing stats", currentTimingStats.getJobId()), + ex); + } } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java index 6fc4ad24e06c4..cf3cb9646617a 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java @@ -12,7 +12,6 @@ import org.elasticsearch.client.Client; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -61,7 +60,6 @@ import static org.mockito.Matchers.eq; import static org.mockito.Matchers.same; import static org.mockito.Mockito.atMost; -import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -456,15 +454,6 @@ public void testFlushAnalysisProblemIsConflict() { assertThat(analysisProblemException.shouldStop, is(true)); } - public void testFinishReportingTimingStats() { - doThrow(new EsRejectedExecutionException()).when(timingStatsReporter).finishReporting(); - - long frequencyMs = 100; - long queryDelayMs = 1000; - DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, 1000, -1, randomBoolean()); - datafeedJob.finishReportingTimingStats(); - } - private DatafeedJob createDatafeedJob(long frequencyMs, long queryDelayMs, long latestFinalBucketEndTimeMs, long latestRecordTimeMs, boolean haveSeenDataPreviously) { Supplier currentTimeSupplier = () -> currentTime; diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporterTests.java index 62a4b4ef4d49a..81f3b7710f544 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporterTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.datafeed; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; @@ -18,8 +19,10 @@ import java.sql.Date; import java.time.Instant; +import static org.elasticsearch.mock.orig.Mockito.doThrow; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -176,6 +179,18 @@ public void testTimingStatsDifferSignificantly() { is(true)); } + public void testFinishReportingTimingStatsException() { + doThrow(new ElasticsearchException("BOOM")).when(timingStatsPersister).persistDatafeedTimingStats(any(), any()); + DatafeedTimingStatsReporter reporter = createReporter(new DatafeedTimingStats(JOB_ID)); + + try { + reporter.reportDataCounts(createDataCounts(0, TIMESTAMP)); + reporter.finishReporting(); + } catch (ElasticsearchException ex) { + fail("Should not have failed with: " + ex.getDetailedMessage()); + } + } + private DatafeedTimingStatsReporter createReporter(DatafeedTimingStats timingStats) { return new DatafeedTimingStatsReporter(timingStats, timingStatsPersister); }