-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ML] Add graceful retry for anomaly detector result indexing failures #49508
[ML] Add graceful retry for anomaly detector result indexing failures #49508
Conversation
Pinging @elastic/ml-core (:ml) |
openJob(job.getId()); | ||
startDatafeed(datafeedConfig.getId(), oneDayAgo, now); | ||
|
||
// TODO Any better way????? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure of a way to ask about the internal state...I wonder if we can read the node logs to see if there is an entry indicating that the bulk index failed?
@@ -218,6 +218,7 @@ public void executeRequest() { | |||
BulkResponse addRecordsResponse = client.bulk(bulkRequest).actionGet(); | |||
if (addRecordsResponse.hasFailures()) { | |||
logger.error("[{}] Bulk index of results has errors: {}", jobId, addRecordsResponse.buildFailureMessage()); | |||
throw new BulkIndexException(addRecordsResponse); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought it best to throw here, and handle retries up the stack. That way the retries know about the processor state and can stop retrying if the processor died (or is dying).
Verified through testing that the datafeed does feel the backpressure in a two of ways:
Previously, bulk index requests did not throw exceptions. With this in mind, the flush failure changed: Flush requests could fail due to the items not being written (previously the error was just logged). This is OK as now we actually write to the listener if the flush fails. The datafeedJob will continue executing even after a failed flush (logging the failure) and proceed to the next execution time (for real-time). Digging through Line 218 in 19c67d6
It is now possible that if a bulk index request fails i.e.
Then the rest of the results processing could be skipped (since bulk index failures now throws). This is slightly different than before, where the rest of the results processing would continue as normal (even if previous results failed to index). In reality, if one of the results failed to bulk index, it could be assumed that the rest would fail as well. But, we want to make this more reliable, so maybe the retries should be inside each of the individual @droberts195 what do you think? |
…cessing does not get skipped
Another thought is now with the bulk retries, the bulk request items do not get cleared after a failure. I think we should probably clear them out, I am thinking within |
@elasticmachine update branch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for taking this on.
I have made a few initial comments.
But as I was reading through the changes I realised this is more complicated than I thought. I'll have a closer look at the flush
logic tomorrow, as that's an area that has the potential to completely lock up all processing if we get it wrong.
"xpack.ml.persist_results_max_retries", | ||
2, | ||
0, | ||
Integer.MAX_VALUE - 2, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably be lower, say 100.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@droberts195 yeah, I agree. I am also thinking the random sleep should probably be a random value between some minimum value and the current exponential backoff max.
@@ -71,6 +73,14 @@ | |||
*/ | |||
public class AutodetectResultProcessor { | |||
|
|||
public static final Setting<Integer> PERSIST_RESULTS_MAX_RETRIES = Setting.intSetting( | |||
"xpack.ml.persist_results_max_retries", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should think more about the name of this setting before release. I guess it's possible that indexing data frame analytics results could also fail and need to be retried. In this case we can keep the current setting name and use the same number of retries for both anomaly detection results and data frame analytics results.
But if in the long term we think this setting will only ever be used for anomaly detection results then we should change the name of the setting to xpack.ml.persist_anomaly_results_max_retries
.
I am leaning towards using the same setting eventually for data frame analytics results and keeping the name as is. What do you think @dimitris-athanasiou?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would definitely make sense to reuse this setting for data frame analytics.
@@ -310,6 +323,46 @@ void processResult(AutodetectResult result) { | |||
} | |||
} | |||
|
|||
void bulkPersistWithRetry(CheckedRunnable<JobResultsPersister.BulkIndexException> bulkRunnable) { | |||
int attempts = 0; | |||
while(attempts < maximumFailureRetries) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't it be <=
, because if retries is zero we still want to try once?
double backOff = ((1 << attempts) - 1) / 2.0; | ||
Thread.sleep((int)(backOff * 100)); | ||
} catch (InterruptedException interrupt) { | ||
LOGGER.warn( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There should be a Thread.currentThread().interrupt();
in this catch
block as well as the logging so that the fact this thread was interrupted is not forgotten.
LOGGER.warn(new ParameterizedMessage("[{}] Error processing autodetect result", jobId), e); | ||
} | ||
} | ||
bulkResultsPersister.clearBulkRequest(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another thought is now with the bulk retries, the bulk request items do not get cleared after a failure.
I think we should also be removing successful items from the bulk request before retrying it, as that will also reduce the burden on the bulk threadpool. That will have to be done in the JobResultsPersister
class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should also be removing successful items from the bulk request before retrying it
This is tricky. I will look into trying to filter out the successes. It may be doable because all the indexing requests contain a provided doc ID.
We discussed this in more detail on a call and came up with the following requirements:
(* retries should be added to data frame analytics in a separate PR - this one should just change anomaly detection) |
.../plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/TimingStatsReporter.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java
Outdated
Show resolved
Hide resolved
...java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java
Outdated
Show resolved
Hide resolved
...va/org/elasticsearch/xpack/ml/job/process/autodetect/writer/CsvDataToProcessWriterTests.java
Show resolved
Hide resolved
...test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java
Show resolved
Hide resolved
...src/test/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterServiceTests.java
Outdated
Show resolved
Hide resolved
...ulti-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/BulkFailureRetryIT.java
Outdated
Show resolved
Hide resolved
...ulti-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/BulkFailureRetryIT.java
Outdated
Show resolved
Hide resolved
client().admin() | ||
.cluster() | ||
.prepareUpdateSettings() | ||
.setTransientSettings(Settings.builder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it have a potential of affecting other, unrelated tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see the @After
clause. It sets all back to null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see this part. My question was more along the lines if it is possible that 2 test classes will share these cluster settings. But I guess it's not the case.
...ulti-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/BulkFailureRetryIT.java
Show resolved
Hide resolved
currentMin = currentMax; | ||
} | ||
double backOff = ((1 << Math.min(currentAttempt, MAX_RETRY_EXPONENT)) - 1) / 2.0; | ||
int max = (int)(backOff * 100); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is undocumented subtlety here. backOff * 100
can be greater than Integer.MAX_VALUE
, and then the cast to int
of a double
greater than Integer.MAX_VALUE
will result in Integer.MAX_VALUE
.
But it makes me wonder whether it would be clearer to just use:
int uncappedBackOff = ((1 << Math.min(currentAttempt, MAX_RETRY_EXPONENT)) - 1) * (100 / 2);
and change MAX_RETRY_EXPONENT
to 24.
This avoids any subtlety with casting int
to double
and back again. Or if there is a really good reason to go via double
, please comment it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rounding up and * 50
should work. Let me experiment
private static final int MAX_RETRY_SLEEP_MILLIS = (int)Duration.ofMinutes(15).toMillis(); | ||
private static final int MIN_RETRY_SLEEP_MILLIS = 50; | ||
// Having an exponent higher than this causes integer overflow | ||
private static final int MAX_RETRY_EXPONENT = 29; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this will need changing to 24, otherwise the int
will overflow when multiplied by 50
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
jshell> int val = ((1 << 29) - 1) * (100 / 2);
val ==> 1073741774
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a long
the answer is 26843545550 though, so 1073741774 is due to wrapping.
Try int val = ((1 << 27) - 1) * (100 / 2);
If we're going to rely on wrapping then it would probably be clearer to just say if (currentAttempt > SOMETHING) { max = magic number }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are 100% right. I reduced to 24.
@elasticmachine update branch |
…nt/elasticsearch into feature/ml-persist-results-retry
...ulti-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/BulkFailureRetryIT.java
Outdated
Show resolved
Hide resolved
...ulti-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/BulkFailureRetryIT.java
Outdated
Show resolved
Hide resolved
...ugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataCountsPersister.java
Outdated
Show resolved
Hide resolved
...ugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataCountsPersister.java
Outdated
Show resolved
Hide resolved
BulkResponse bulkIndexWithRetry(BulkRequest bulkRequest, | ||
String jobId, | ||
Supplier<Boolean> shouldRetry, | ||
Consumer<String> msgHandler, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could the msgHandler do both: log and audit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, whatever the handler wants, but I do think it is important for results persister to log on its own.
...n/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterService.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
I think we should also add retries for the model state documents that get indexed by IndexingStateProcessor
, but these can be added in a new PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
client().admin() | ||
.cluster() | ||
.prepareUpdateSettings() | ||
.setTransientSettings(Settings.builder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see this part. My question was more along the lines if it is possible that 2 test classes will share these cluster settings. But I guess it's not the case.
...test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java
Show resolved
Hide resolved
@elasticmachine update branch |
…elastic#49508) All results indexing now retry the amount of times configured in `xpack.ml.persist_results_max_retries`. The retries are done in a semi-random, exponential backoff.
…elastic#49508) All results indexing now retry the amount of times configured in `xpack.ml.persist_results_max_retries`. The retries are done in a semi-random, exponential backoff.
This adds a new setting for allowing bulk indexing of results to retry.
In theory, this should work just fine as the named pipes are are bounded queues.
In the event of a retry:
Marking this as WIP as more digging through the failure paths needs to be done.
Would also be good to get a larger scale test in place to verify the backpressure is sent all the way back to the datafeed without anything getting dropped.
closes #45711