Skip to content

Commit

Permalink
[Transform] improve irrecoverable error detection
Browse files Browse the repository at this point in the history
treat resource not found and illegal argument exceptions as irrecoverable error

relates elastic#50135
  • Loading branch information
Hendrik Muhs committed Feb 4, 2020
1 parent ba2810f commit 5d5f3ce
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ protected void doNextBulk(BulkRequest request, ActionListener<BulkResponse> next
// Determine whether the failure is irrecoverable (transform should go into failed state) or not (transform increments
// the indexing failure counter
// and possibly retries)
Exception irrecoverableException = ExceptionRootCauseFinder.getFirstIrrecoverableExceptionFromBulkResponses(
Throwable irrecoverableException = ExceptionRootCauseFinder.getFirstIrrecoverableExceptionFromBulkResponses(
deduplicatedFailures.values()
);
if (irrecoverableException == null) {
Expand Down Expand Up @@ -373,7 +373,7 @@ private static String getBulkIndexDetailedFailureMessage(String prefix, Map<Stri
return failureMessage;
}

private static Exception decorateBulkIndexException(Exception irrecoverableException) {
private static Throwable decorateBulkIndexException(Throwable irrecoverableException) {
if (irrecoverableException instanceof MapperParsingException) {
return new TransformException(
"Destination index mappings are incompatible with the transform configuration.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,28 +484,30 @@ synchronized void handleFailure(Exception e) {
handleIrrecoverableBulkIndexingException((BulkIndexingException) unwrappedException);
} else if (unwrappedException instanceof IndexNotFoundException
|| unwrappedException instanceof AggregationResultUtils.AggregationExtractionException
|| unwrappedException instanceof TransformConfigReloadingException) {
failIndexer("task encountered irrecoverable failure: " + e.getMessage());
} else if (context.getAndIncrementFailureCount() > context.getNumFailureRetries()) {
failIndexer(
"task encountered more than "
+ context.getNumFailureRetries()
+ " failures; latest failure: "
+ ExceptionRootCauseFinder.getDetailedMessage(unwrappedException)
);
} else {
// Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous
// times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one
if (e.getMessage().equals(lastAuditedExceptionMessage) == false) {
String message = ExceptionRootCauseFinder.getDetailedMessage(unwrappedException);

auditor.warning(
getJobId(),
"Transform encountered an exception: " + message + " Will attempt again at next scheduled trigger."
|| unwrappedException instanceof TransformConfigReloadingException
|| unwrappedException instanceof ResourceNotFoundException
|| unwrappedException instanceof IllegalArgumentException) {
failIndexer("task encountered irrecoverable failure: " + e.getMessage());
} else if (context.getAndIncrementFailureCount() > context.getNumFailureRetries()) {
failIndexer(
"task encountered more than "
+ context.getNumFailureRetries()
+ " failures; latest failure: "
+ ExceptionRootCauseFinder.getDetailedMessage(unwrappedException)
);
lastAuditedExceptionMessage = message;
} else {
// Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous
// times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one
if (e.getMessage().equals(lastAuditedExceptionMessage) == false) {
String message = ExceptionRootCauseFinder.getDetailedMessage(unwrappedException);

auditor.warning(
getJobId(),
"Transform encountered an exception: " + message + " Will attempt again at next scheduled trigger."
);
lastAuditedExceptionMessage = message;
}
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package org.elasticsearch.xpack.transform.utils;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.ShardSearchFailure;
Expand Down Expand Up @@ -63,10 +64,13 @@ public static String getDetailedMessage(Throwable t) {
* @param failures a collection of bulk item responses
* @return The first exception considered irrecoverable if there are any, null if no irrecoverable exception found
*/
public static Exception getFirstIrrecoverableExceptionFromBulkResponses(Collection<BulkItemResponse> failures) {
public static Throwable getFirstIrrecoverableExceptionFromBulkResponses(Collection<BulkItemResponse> failures) {
for (BulkItemResponse failure : failures) {
if (failure.getFailure().getCause() instanceof MapperParsingException) {
return failure.getFailure().getCause();
Throwable unwrappedThrowable = org.elasticsearch.ExceptionsHelper.unwrapCause(failure.getFailure().getCause());
if (unwrappedThrowable instanceof MapperParsingException
|| unwrappedThrowable instanceof IllegalArgumentException
|| unwrappedThrowable instanceof ResourceNotFoundException) {
return unwrappedThrowable;
}
}

Expand Down

0 comments on commit 5d5f3ce

Please sign in to comment.