Skip to content

Commit

Permalink
Handle EMR Exceptions in FlintCancelJob Operation
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <[email protected]>
  • Loading branch information
vmmusings committed Mar 22, 2024
1 parent 4dc83b7 commit a23a89e
Show file tree
Hide file tree
Showing 8 changed files with 950 additions and 864 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,13 @@ public interface EMRServerlessClient {
* @return {@link CancelJobRunResult}
*/
CancelJobRunResult cancelJobRun(String applicationId, String jobId);

/**
* Cancel emr serverless job run.
*
* @param applicationId applicationId.
* @param jobId jobId.
* @return {@link CancelJobRunResult}
*/
CancelJobRunResult cancelJobRunWithNativeEMRException(String applicationId, String jobId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class EmrServerlessClientImpl implements EMRServerlessClient {

private static final int MAX_JOB_NAME_LENGTH = 255;

private static final String GENERIC_INTERNAL_SERVER_ERROR_MESSAGE = "Internal Server Error.";
public static final String GENERIC_INTERNAL_SERVER_ERROR_MESSAGE = "Internal Server Error.";

public EmrServerlessClientImpl(AWSEMRServerless emrServerless) {
this.emrServerless = emrServerless;
Expand Down Expand Up @@ -117,4 +117,16 @@ public CancelJobRunResult cancelJobRun(String applicationId, String jobId) {
logger.info(String.format("Job : %s cancelled", cancelJobRunResult.getJobRunId()));
return cancelJobRunResult;
}

@Override
public CancelJobRunResult cancelJobRunWithNativeEMRException(String applicationId, String jobId) {
CancelJobRunRequest cancelJobRunRequest =
new CancelJobRunRequest().withJobRunId(jobId).withApplicationId(applicationId);
CancelJobRunResult cancelJobRunResult =
AccessController.doPrivileged(
(PrivilegedAction<CancelJobRunResult>)
() -> emrServerless.cancelJobRun(cancelJobRunRequest));
logger.info(String.format("Job : %s cancelled", cancelJobRunResult.getJobRunId()));
return cancelJobRunResult;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@

package org.opensearch.sql.spark.flint.operation;

import static org.opensearch.sql.spark.client.EmrServerlessClientImpl.GENERIC_INTERNAL_SERVER_ERROR_MESSAGE;
import static org.opensearch.sql.spark.execution.statestore.StateStore.deleteFlintIndexState;
import static org.opensearch.sql.spark.execution.statestore.StateStore.getFlintIndexState;
import static org.opensearch.sql.spark.execution.statestore.StateStore.updateFlintIndexState;

import com.amazonaws.services.emrserverless.model.ResourceNotFoundException;
import com.amazonaws.services.emrserverless.model.ValidationException;
import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -144,12 +147,19 @@ public void cancelStreamingJob(
String applicationId = flintIndexStateModel.getApplicationId();
String jobId = flintIndexStateModel.getJobId();
try {
emrServerlessClient.cancelJobRun(
emrServerlessClient.cancelJobRunWithNativeEMRException(
flintIndexStateModel.getApplicationId(), flintIndexStateModel.getJobId());
} catch (IllegalArgumentException e) {
// handle job does not exist case.
} catch (ResourceNotFoundException e) {
// JobId Not Found Exception.
LOG.error(e);
return;
} catch (ValidationException e) {
// Exception when the job is not in cancellable state and already in terminal state.
LOG.error(e);
return;
} catch (Exception e) {
LOG.error(e);
throw new RuntimeException(GENERIC_INTERNAL_SERVER_ERROR_MESSAGE);
}

// pull job state until timeout or cancelled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,12 @@ public CancelJobRunResult cancelJobRun(String applicationId, String jobId) {
return new CancelJobRunResult().withJobRunId(jobId);
}

@Override
public CancelJobRunResult cancelJobRunWithNativeEMRException(
String applicationId, String jobId) {
return null;
}

public void startJobRunCalled(int expectedTimes) {
assertEquals(expectedTimes, startJobRunCalled);
}
Expand Down
Loading

0 comments on commit a23a89e

Please sign in to comment.