Skip to content

Commit

Permalink
Handle EMR Exceptions in FlintCancelJob Operation
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <[email protected]>
  • Loading branch information
vmmusings committed Mar 22, 2024
1 parent 4dc83b7 commit e35d065
Show file tree
Hide file tree
Showing 13 changed files with 1,151 additions and 891 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,6 @@ public interface EMRServerlessClient {
* @param jobId jobId.
* @return {@link CancelJobRunResult}
*/
CancelJobRunResult cancelJobRun(String applicationId, String jobId);
CancelJobRunResult cancelJobRun(
String applicationId, String jobId, boolean allowExceptionPropagation);
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class EmrServerlessClientImpl implements EMRServerlessClient {

private static final int MAX_JOB_NAME_LENGTH = 255;

private static final String GENERIC_INTERNAL_SERVER_ERROR_MESSAGE = "Internal Server Error.";
public static final String GENERIC_INTERNAL_SERVER_ERROR_MESSAGE = "Internal Server Error.";

public EmrServerlessClientImpl(AWSEMRServerless emrServerless) {
this.emrServerless = emrServerless;
Expand Down Expand Up @@ -98,7 +98,8 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) {
}

@Override
public CancelJobRunResult cancelJobRun(String applicationId, String jobId) {
public CancelJobRunResult cancelJobRun(
String applicationId, String jobId, boolean allowExceptionPropagation) {
CancelJobRunRequest cancelJobRunRequest =
new CancelJobRunRequest().withJobRunId(jobId).withApplicationId(applicationId);
CancelJobRunResult cancelJobRunResult =
Expand All @@ -108,10 +109,14 @@ public CancelJobRunResult cancelJobRun(String applicationId, String jobId) {
try {
return emrServerless.cancelJobRun(cancelJobRunRequest);
} catch (Throwable t) {
logger.error("Error while making cancel job request to emr:", t);
MetricUtils.incrementNumericalMetric(
MetricName.EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT);
throw new RuntimeException(GENERIC_INTERNAL_SERVER_ERROR_MESSAGE);
if (allowExceptionPropagation) {
throw t;
} else {
logger.error("Error while making cancel job request to emr:", t);
MetricUtils.incrementNumericalMetric(
MetricName.EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT);
throw new RuntimeException(GENERIC_INTERNAL_SERVER_ERROR_MESSAGE);
}
}
});
logger.info(String.format("Job : %s cancelled", cancelJobRunResult.getJobRunId()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ protected JSONObject getResponseFromExecutor(AsyncQueryJobMetadata asyncQueryJob
@Override
public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
emrServerlessClient.cancelJobRun(
asyncQueryJobMetadata.getApplicationId(), asyncQueryJobMetadata.getJobId());
asyncQueryJobMetadata.getApplicationId(), asyncQueryJobMetadata.getJobId(), false);
return asyncQueryJobMetadata.getQueryId().getId();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ public void close() {
if (model.isEmpty()) {
throw new IllegalStateException("session does not exist. " + sessionModel.getSessionId());
} else {
serverlessClient.cancelJobRun(sessionModel.getApplicationId(), sessionModel.getJobId());
serverlessClient.cancelJobRun(
sessionModel.getApplicationId(), sessionModel.getJobId(), false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@

package org.opensearch.sql.spark.flint.operation;

import static org.opensearch.sql.spark.client.EmrServerlessClientImpl.GENERIC_INTERNAL_SERVER_ERROR_MESSAGE;
import static org.opensearch.sql.spark.execution.statestore.StateStore.deleteFlintIndexState;
import static org.opensearch.sql.spark.execution.statestore.StateStore.getFlintIndexState;
import static org.opensearch.sql.spark.execution.statestore.StateStore.updateFlintIndexState;

import com.amazonaws.services.emrserverless.model.ValidationException;
import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -145,11 +147,18 @@ public void cancelStreamingJob(
String jobId = flintIndexStateModel.getJobId();
try {
emrServerlessClient.cancelJobRun(
flintIndexStateModel.getApplicationId(), flintIndexStateModel.getJobId());
} catch (IllegalArgumentException e) {
// handle job does not exist case.
flintIndexStateModel.getApplicationId(), flintIndexStateModel.getJobId(), true);
} catch (ValidationException e) {
// Exception when the job is not in cancellable state and already in terminal state.
if (e.getMessage().contains("Job run is not in a cancellable state")) {
LOG.error(e);
return;
} else {
throw new RuntimeException(GENERIC_INTERNAL_SERVER_ERROR_MESSAGE);
}
} catch (Exception e) {
LOG.error(e);
return;
throw new RuntimeException(GENERIC_INTERNAL_SERVER_ERROR_MESSAGE);
}

// pull job state until timeout or cancelled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) {
}

@Override
public CancelJobRunResult cancelJobRun(String applicationId, String jobId) {
public CancelJobRunResult cancelJobRun(
String applicationId, String jobId, boolean allowExceptionPropagation) {
cancelJobRunCalled++;
return new CancelJobRunResult().withJobRunId(jobId);
}
Expand Down
Loading

0 comments on commit e35d065

Please sign in to comment.