From a23a89efa7dc5ec3100d75c9b5a95e949f885719 Mon Sep 17 00:00:00 2001 From: Vamsi Manohar Date: Tue, 19 Mar 2024 21:44:49 -0700 Subject: [PATCH] Handle EMR Exceptions in FlintCancelJob Operation Signed-off-by: Vamsi Manohar --- .../sql/spark/client/EMRServerlessClient.java | 9 + .../spark/client/EmrServerlessClientImpl.java | 14 +- .../spark/flint/operation/FlintIndexOp.java | 16 +- .../AsyncQueryExecutorServiceSpec.java | 6 + .../asyncquery/IndexQuerySpecAlterTest.java | 880 ++++++++++++++++++ .../spark/asyncquery/IndexQuerySpecTest.java | 860 ----------------- .../client/EmrServerlessClientImplTest.java | 23 + .../session/InteractiveSessionTest.java | 6 + 8 files changed, 950 insertions(+), 864 deletions(-) create mode 100644 spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecAlterTest.java diff --git a/spark/src/main/java/org/opensearch/sql/spark/client/EMRServerlessClient.java b/spark/src/main/java/org/opensearch/sql/spark/client/EMRServerlessClient.java index 7e64b632ea..af9602e1eb 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/client/EMRServerlessClient.java +++ b/spark/src/main/java/org/opensearch/sql/spark/client/EMRServerlessClient.java @@ -42,4 +42,13 @@ public interface EMRServerlessClient { * @return {@link CancelJobRunResult} */ CancelJobRunResult cancelJobRun(String applicationId, String jobId); + + /** + * Cancel emr serverless job run. + * + * @param applicationId applicationId. + * @param jobId jobId. + * @return {@link CancelJobRunResult} + */ + CancelJobRunResult cancelJobRunWithNativeEMRException(String applicationId, String jobId); } diff --git a/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java b/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java index 3a47eb21a7..bb58e02b32 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java +++ b/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java @@ -32,7 +32,7 @@ public class EmrServerlessClientImpl implements EMRServerlessClient { private static final int MAX_JOB_NAME_LENGTH = 255; - private static final String GENERIC_INTERNAL_SERVER_ERROR_MESSAGE = "Internal Server Error."; + public static final String GENERIC_INTERNAL_SERVER_ERROR_MESSAGE = "Internal Server Error."; public EmrServerlessClientImpl(AWSEMRServerless emrServerless) { this.emrServerless = emrServerless; @@ -117,4 +117,16 @@ public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { logger.info(String.format("Job : %s cancelled", cancelJobRunResult.getJobRunId())); return cancelJobRunResult; } + + @Override + public CancelJobRunResult cancelJobRunWithNativeEMRException(String applicationId, String jobId) { + CancelJobRunRequest cancelJobRunRequest = + new CancelJobRunRequest().withJobRunId(jobId).withApplicationId(applicationId); + CancelJobRunResult cancelJobRunResult = + AccessController.doPrivileged( + (PrivilegedAction) + () -> emrServerless.cancelJobRun(cancelJobRunRequest)); + logger.info(String.format("Job : %s cancelled", cancelJobRunResult.getJobRunId())); + return cancelJobRunResult; + } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOp.java b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOp.java index 0e99c18eef..d17a1e95e3 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOp.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOp.java @@ -5,10 +5,13 @@ package org.opensearch.sql.spark.flint.operation; +import static org.opensearch.sql.spark.client.EmrServerlessClientImpl.GENERIC_INTERNAL_SERVER_ERROR_MESSAGE; import static org.opensearch.sql.spark.execution.statestore.StateStore.deleteFlintIndexState; import static org.opensearch.sql.spark.execution.statestore.StateStore.getFlintIndexState; import static org.opensearch.sql.spark.execution.statestore.StateStore.updateFlintIndexState; +import com.amazonaws.services.emrserverless.model.ResourceNotFoundException; +import com.amazonaws.services.emrserverless.model.ValidationException; import java.util.Locale; import java.util.Optional; import java.util.concurrent.TimeUnit; @@ -144,12 +147,19 @@ public void cancelStreamingJob( String applicationId = flintIndexStateModel.getApplicationId(); String jobId = flintIndexStateModel.getJobId(); try { - emrServerlessClient.cancelJobRun( + emrServerlessClient.cancelJobRunWithNativeEMRException( flintIndexStateModel.getApplicationId(), flintIndexStateModel.getJobId()); - } catch (IllegalArgumentException e) { - // handle job does not exist case. + } catch (ResourceNotFoundException e) { + // JobId Not Found Exception. LOG.error(e); return; + } catch (ValidationException e) { + // Exception when the job is not in cancellable state and already in terminal state. + LOG.error(e); + return; + } catch (Exception e) { + LOG.error(e); + throw new RuntimeException(GENERIC_INTERNAL_SERVER_ERROR_MESSAGE); } // pull job state until timeout or cancelled. diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java index c064067e26..0158e044e2 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java @@ -248,6 +248,12 @@ public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { return new CancelJobRunResult().withJobRunId(jobId); } + @Override + public CancelJobRunResult cancelJobRunWithNativeEMRException( + String applicationId, String jobId) { + return null; + } + public void startJobRunCalled(int expectedTimes) { assertEquals(expectedTimes, startJobRunCalled); } diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecAlterTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecAlterTest.java new file mode 100644 index 0000000000..b991dbcbcd --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecAlterTest.java @@ -0,0 +1,880 @@ +package org.opensearch.sql.spark.asyncquery; + +import com.amazonaws.services.emrserverless.model.CancelJobRunResult; +import com.amazonaws.services.emrserverless.model.GetJobRunResult; +import com.amazonaws.services.emrserverless.model.JobRun; +import com.google.common.collect.ImmutableList; +import java.util.HashMap; +import java.util.Map; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.opensearch.sql.spark.asyncquery.model.AsyncQueryExecutionResponse; +import org.opensearch.sql.spark.asyncquery.model.MockFlintIndex; +import org.opensearch.sql.spark.asyncquery.model.MockFlintSparkJob; +import org.opensearch.sql.spark.client.EMRServerlessClientFactory; +import org.opensearch.sql.spark.client.StartJobRequest; +import org.opensearch.sql.spark.flint.FlintIndexState; +import org.opensearch.sql.spark.flint.FlintIndexType; +import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest; +import org.opensearch.sql.spark.rest.model.CreateAsyncQueryResponse; +import org.opensearch.sql.spark.rest.model.LangType; + +public class IndexQuerySpecAlterTest extends AsyncQueryExecutorServiceSpec { + + @Test + public void testAlterIndexQueryConvertingToManualRefresh() { + MockFlintIndex ALTER_SKIPPING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_skipping_index", + FlintIndexType.SKIPPING, + "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false," + + " incremental_refresh=false)"); + MockFlintIndex ALTER_COVERING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_covering_index", + FlintIndexType.COVERING, + "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (auto_refresh=false," + + " incremental_refresh=false)"); + MockFlintIndex ALTER_MV = + new MockFlintIndex( + client, + "flint_my_glue_mydb_mv", + FlintIndexType.MATERIALIZED_VIEW, + "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false," + + " incremental_refresh=false) "); + ImmutableList.of(ALTER_SKIPPING, ALTER_COVERING, ALTER_MV) + .forEach( + mockDS -> { + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + super.getJobRunResult(applicationId, jobId); + JobRun jobRun = new JobRun(); + jobRun.setState("cancelled"); + return new GetJobRunResult().withJobRun(jobRun); + } + }; + EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrServerlessClientFactory); + // Mock flint index + mockDS.createIndex(); + HashMap existingOptions = new HashMap<>(); + existingOptions.put("auto_refresh", "true"); + mockDS.updateIndexOptions(existingOptions, false); + // Mock index state + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); + flintIndexJob.active(); + + // 1. alter index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); + + // 2. fetch result + AsyncQueryExecutionResponse asyncQueryExecutionResponse = + asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); + assertEquals("SUCCESS", asyncQueryExecutionResponse.getStatus()); + emrsClient.startJobRunCalled(0); + emrsClient.cancelJobRunCalled(1); + flintIndexJob.assertState(FlintIndexState.ACTIVE); + Map mappings = mockDS.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("false", options.get("auto_refresh")); + }); + } + + @Test + public void testAlterIndexQueryConvertingToManualRefreshWithNoIncrementalRefresh() { + MockFlintIndex ALTER_SKIPPING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_skipping_index", + FlintIndexType.SKIPPING, + "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false)"); + MockFlintIndex ALTER_COVERING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_covering_index", + FlintIndexType.COVERING, + "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (auto_refresh=false)"); + MockFlintIndex ALTER_MV = + new MockFlintIndex( + client, + "flint_my_glue_mydb_mv", + FlintIndexType.MATERIALIZED_VIEW, + "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false)"); + ImmutableList.of(ALTER_SKIPPING, ALTER_COVERING, ALTER_MV) + .forEach( + mockDS -> { + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + super.getJobRunResult(applicationId, jobId); + JobRun jobRun = new JobRun(); + jobRun.setState("cancelled"); + return new GetJobRunResult().withJobRun(jobRun); + } + }; + EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrServerlessClientFactory); + // Mock flint index + mockDS.createIndex(); + HashMap existingOptions = new HashMap<>(); + existingOptions.put("auto_refresh", "true"); + existingOptions.put("checkpoint_location", "s3://checkpoint/location"); + mockDS.updateIndexOptions(existingOptions, true); + // Mock index state + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); + flintIndexJob.active(); + + // 1. alter index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); + + // 2. fetch result + AsyncQueryExecutionResponse asyncQueryExecutionResponse = + asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); + assertEquals("SUCCESS", asyncQueryExecutionResponse.getStatus()); + emrsClient.startJobRunCalled(0); + emrsClient.cancelJobRunCalled(1); + flintIndexJob.assertState(FlintIndexState.ACTIVE); + Map mappings = mockDS.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("false", options.get("auto_refresh")); + }); + } + + @Test + public void testAlterIndexQueryWithRedundantOperation() { + MockFlintIndex ALTER_SKIPPING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_skipping_index", + FlintIndexType.SKIPPING, + "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false," + + " incremental_refresh=false)"); + MockFlintIndex ALTER_COVERING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_covering_index", + FlintIndexType.COVERING, + "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (auto_refresh=false," + + " incremental_refresh=false)"); + MockFlintIndex ALTER_MV = + new MockFlintIndex( + client, + "flint_my_glue_mydb_mv", + FlintIndexType.MATERIALIZED_VIEW, + "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false," + + " incremental_refresh=false) "); + ImmutableList.of(ALTER_SKIPPING, ALTER_COVERING, ALTER_MV) + .forEach( + mockDS -> { + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public String startJobRun(StartJobRequest startJobRequest) { + return "jobId"; + } + + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + JobRun jobRun = new JobRun(); + jobRun.setState("cancelled"); + return new GetJobRunResult().withJobRun(jobRun); + } + + @Override + public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { + super.cancelJobRun(applicationId, jobId); + throw new IllegalArgumentException("JobId doesn't exist"); + } + }; + EMRServerlessClientFactory emrServerlessCientFactory = () -> emrsClient; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrServerlessCientFactory); + // Mock flint index + mockDS.createIndex(); + HashMap existingOptions = new HashMap<>(); + existingOptions.put("auto_refresh", "false"); + mockDS.updateIndexOptions(existingOptions, false); + // Mock index state + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); + flintIndexJob.active(); + + // 1. alter index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); + + // 2. fetch result + AsyncQueryExecutionResponse asyncQueryExecutionResponse = + asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); + assertEquals("SUCCESS", asyncQueryExecutionResponse.getStatus()); + emrsClient.startJobRunCalled(0); + emrsClient.cancelJobRunCalled(1); + emrsClient.getJobRunResultCalled(0); + flintIndexJob.assertState(FlintIndexState.ACTIVE); + Map mappings = mockDS.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("false", options.get("auto_refresh")); + }); + } + + @Test + public void testAlterIndexQueryConvertingToAutoRefresh() { + MockFlintIndex ALTER_SKIPPING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_skipping_index", + FlintIndexType.SKIPPING, + "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=true," + + " incremental_refresh=false)"); + MockFlintIndex ALTER_COVERING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_covering_index", + FlintIndexType.COVERING, + "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (auto_refresh=true," + + " incremental_refresh=false)"); + MockFlintIndex ALTER_MV = + new MockFlintIndex( + client, + "flint_my_glue_mydb_mv", + FlintIndexType.MATERIALIZED_VIEW, + "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=true," + + " incremental_refresh=false) "); + ImmutableList.of(ALTER_SKIPPING, ALTER_COVERING, ALTER_MV) + .forEach( + mockDS -> { + LocalEMRSClient localEMRSClient = new LocalEMRSClient(); + EMRServerlessClientFactory clientFactory = () -> localEMRSClient; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(clientFactory); + + // Mock flint index + mockDS.createIndex(); + HashMap existingOptions = new HashMap<>(); + existingOptions.put("auto_refresh", "false"); + mockDS.updateIndexOptions(existingOptions, false); + // Mock index state + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); + flintIndexJob.active(); + + // 1. alter index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); + + // 2. fetch result + assertEquals( + "RUNNING", + asyncQueryExecutorService + .getAsyncQueryResults(response.getQueryId()) + .getStatus()); + + flintIndexJob.assertState(FlintIndexState.ACTIVE); + localEMRSClient.startJobRunCalled(1); + localEMRSClient.getJobRunResultCalled(1); + localEMRSClient.cancelJobRunCalled(0); + Map mappings = mockDS.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("false", options.get("auto_refresh")); + }); + } + + @Test + public void testAlterIndexQueryWithOutAnyAutoRefresh() { + MockFlintIndex ALTER_SKIPPING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_skipping_index", + FlintIndexType.SKIPPING, + "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (" + + " incremental_refresh=false)"); + MockFlintIndex ALTER_COVERING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_covering_index", + FlintIndexType.COVERING, + "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (" + + " incremental_refresh=false)"); + MockFlintIndex ALTER_MV = + new MockFlintIndex( + client, + "flint_my_glue_mydb_mv", + FlintIndexType.MATERIALIZED_VIEW, + "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (" + " incremental_refresh=false) "); + ImmutableList.of(ALTER_SKIPPING, ALTER_COVERING, ALTER_MV) + .forEach( + mockDS -> { + LocalEMRSClient localEMRSClient = new LocalEMRSClient(); + EMRServerlessClientFactory clientFactory = () -> localEMRSClient; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(clientFactory); + + // Mock flint index + mockDS.createIndex(); + HashMap existingOptions = new HashMap<>(); + existingOptions.put("auto_refresh", "false"); + mockDS.updateIndexOptions(existingOptions, false); + // Mock index state + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); + flintIndexJob.active(); + + // 1. alter index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); + + // 2. fetch result + assertEquals( + "RUNNING", + asyncQueryExecutorService + .getAsyncQueryResults(response.getQueryId()) + .getStatus()); + + flintIndexJob.assertState(FlintIndexState.ACTIVE); + localEMRSClient.startJobRunCalled(1); + localEMRSClient.getJobRunResultCalled(1); + localEMRSClient.cancelJobRunCalled(0); + Map mappings = mockDS.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("false", options.get("auto_refresh")); + }); + } + + @Test + public void testAlterIndexQueryOfFullRefreshWithInvalidOptions() { + MockFlintIndex ALTER_SKIPPING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_skipping_index", + FlintIndexType.SKIPPING, + "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false," + + " incremental_refresh=false, checkpoint_location=\"s3://ckp/skp\")"); + MockFlintIndex ALTER_COVERING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_covering_index", + FlintIndexType.COVERING, + "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (auto_refresh=false," + + " incremental_refresh=false, checkpoint_location=\"s3://ckp/skp\")"); + MockFlintIndex ALTER_MV = + new MockFlintIndex( + client, + "flint_my_glue_mydb_mv", + FlintIndexType.MATERIALIZED_VIEW, + "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false," + + " incremental_refresh=false, checkpoint_location=\"s3://ckp/skp\") "); + ImmutableList.of(ALTER_SKIPPING, ALTER_COVERING, ALTER_MV) + .forEach( + mockDS -> { + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + super.getJobRunResult(applicationId, jobId); + JobRun jobRun = new JobRun(); + jobRun.setState("cancelled"); + return new GetJobRunResult().withJobRun(jobRun); + } + }; + EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrServerlessClientFactory); + // Mock flint index + mockDS.createIndex(); + HashMap existingOptions = new HashMap<>(); + existingOptions.put("auto_refresh", "true"); + mockDS.updateIndexOptions(existingOptions, false); + // Mock index state + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); + flintIndexJob.active(); + + // 1. alter index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); + + // 2. fetch result + AsyncQueryExecutionResponse asyncQueryExecutionResponse = + asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); + assertEquals("FAILED", asyncQueryExecutionResponse.getStatus()); + assertEquals( + "Altering to full refresh only allows: [auto_refresh, incremental_refresh]" + + " options", + asyncQueryExecutionResponse.getError()); + emrsClient.startJobRunCalled(0); + emrsClient.cancelJobRunCalled(0); + flintIndexJob.assertState(FlintIndexState.ACTIVE); + Map mappings = mockDS.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("true", options.get("auto_refresh")); + }); + } + + @Test + public void testAlterIndexQueryOfIncrementalRefreshWithInvalidOptions() { + MockFlintIndex ALTER_SKIPPING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_skipping_index", + FlintIndexType.SKIPPING, + "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false," + + " incremental_refresh=true, output_mode=\"complete\")"); + MockFlintIndex ALTER_COVERING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_covering_index", + FlintIndexType.COVERING, + "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (auto_refresh=false," + + " incremental_refresh=true, output_mode=\"complete\")"); + MockFlintIndex ALTER_MV = + new MockFlintIndex( + client, + "flint_my_glue_mydb_mv", + FlintIndexType.MATERIALIZED_VIEW, + "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false," + + " incremental_refresh=true, output_mode=\"complete\") "); + ImmutableList.of(ALTER_SKIPPING, ALTER_COVERING, ALTER_MV) + .forEach( + mockDS -> { + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + super.getJobRunResult(applicationId, jobId); + JobRun jobRun = new JobRun(); + jobRun.setState("cancelled"); + return new GetJobRunResult().withJobRun(jobRun); + } + }; + EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrServerlessClientFactory); + // Mock flint index + mockDS.createIndex(); + HashMap existingOptions = new HashMap<>(); + existingOptions.put("auto_refresh", "true"); + mockDS.updateIndexOptions(existingOptions, false); + // Mock index state + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); + flintIndexJob.active(); + + // 1. alter index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); + + // 2. fetch result + AsyncQueryExecutionResponse asyncQueryExecutionResponse = + asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); + assertEquals("FAILED", asyncQueryExecutionResponse.getStatus()); + assertEquals( + "Altering to incremental refresh only allows: [auto_refresh, incremental_refresh," + + " watermark_delay, checkpoint_location] options", + asyncQueryExecutionResponse.getError()); + emrsClient.startJobRunCalled(0); + emrsClient.cancelJobRunCalled(0); + flintIndexJob.assertState(FlintIndexState.ACTIVE); + Map mappings = mockDS.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("true", options.get("auto_refresh")); + }); + } + + @Test + public void testAlterIndexQueryOfIncrementalRefreshWithInsufficientOptions() { + MockFlintIndex ALTER_SKIPPING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_skipping_index", + FlintIndexType.SKIPPING, + "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false," + + " incremental_refresh=true)"); + MockFlintIndex ALTER_COVERING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_covering_index", + FlintIndexType.COVERING, + "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (auto_refresh=false," + + " incremental_refresh=true)"); + ImmutableList.of(ALTER_SKIPPING, ALTER_COVERING) + .forEach( + mockDS -> { + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + super.getJobRunResult(applicationId, jobId); + JobRun jobRun = new JobRun(); + jobRun.setState("cancelled"); + return new GetJobRunResult().withJobRun(jobRun); + } + }; + EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrServerlessClientFactory); + // Mock flint index + mockDS.createIndex(); + HashMap existingOptions = new HashMap<>(); + existingOptions.put("auto_refresh", "true"); + existingOptions.put("incremental_refresh", "false"); + mockDS.updateIndexOptions(existingOptions, true); + // Mock index state + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); + flintIndexJob.active(); + + // 1. alter index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); + + // 2. fetch result + AsyncQueryExecutionResponse asyncQueryExecutionResponse = + asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); + assertEquals("FAILED", asyncQueryExecutionResponse.getStatus()); + assertEquals( + "Conversion to incremental refresh index cannot proceed due to missing" + + " attributes: checkpoint_location.", + asyncQueryExecutionResponse.getError()); + emrsClient.startJobRunCalled(0); + emrsClient.cancelJobRunCalled(0); + flintIndexJob.assertState(FlintIndexState.ACTIVE); + Map mappings = mockDS.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("true", options.get("auto_refresh")); + }); + } + + @Test + public void testAlterIndexQueryOfIncrementalRefreshWithInsufficientOptionsForMV() { + MockFlintIndex ALTER_MV = + new MockFlintIndex( + client, + "flint_my_glue_mydb_mv", + FlintIndexType.MATERIALIZED_VIEW, + "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false," + + " incremental_refresh=true) "); + ImmutableList.of(ALTER_MV) + .forEach( + mockDS -> { + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + super.getJobRunResult(applicationId, jobId); + JobRun jobRun = new JobRun(); + jobRun.setState("cancelled"); + return new GetJobRunResult().withJobRun(jobRun); + } + }; + EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrServerlessClientFactory); + // Mock flint index + mockDS.createIndex(); + HashMap existingOptions = new HashMap<>(); + existingOptions.put("auto_refresh", "true"); + existingOptions.put("incremental_refresh", "false"); + mockDS.updateIndexOptions(existingOptions, true); + // Mock index state + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); + flintIndexJob.active(); + + // 1. alter index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); + + // 2. fetch result + AsyncQueryExecutionResponse asyncQueryExecutionResponse = + asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); + assertEquals("FAILED", asyncQueryExecutionResponse.getStatus()); + assertEquals( + "Conversion to incremental refresh index cannot proceed due to missing" + + " attributes: checkpoint_location, watermark_delay.", + asyncQueryExecutionResponse.getError()); + emrsClient.startJobRunCalled(0); + emrsClient.cancelJobRunCalled(0); + flintIndexJob.assertState(FlintIndexState.ACTIVE); + Map mappings = mockDS.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("true", options.get("auto_refresh")); + }); + } + + @Test + public void testAlterIndexQueryOfIncrementalRefreshWithEmptyExistingOptionsForMV() { + MockFlintIndex ALTER_MV = + new MockFlintIndex( + client, + "flint_my_glue_mydb_mv", + FlintIndexType.MATERIALIZED_VIEW, + "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false," + + " incremental_refresh=true) "); + ImmutableList.of(ALTER_MV) + .forEach( + mockDS -> { + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + super.getJobRunResult(applicationId, jobId); + JobRun jobRun = new JobRun(); + jobRun.setState("cancelled"); + return new GetJobRunResult().withJobRun(jobRun); + } + }; + EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrServerlessClientFactory); + // Mock flint index + mockDS.createIndex(); + HashMap existingOptions = new HashMap<>(); + existingOptions.put("auto_refresh", "true"); + existingOptions.put("incremental_refresh", "false"); + existingOptions.put("watermark_delay", ""); + existingOptions.put("checkpoint_location", ""); + mockDS.updateIndexOptions(existingOptions, true); + // Mock index state + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); + flintIndexJob.active(); + + // 1. alter index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); + + // 2. fetch result + AsyncQueryExecutionResponse asyncQueryExecutionResponse = + asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); + assertEquals("FAILED", asyncQueryExecutionResponse.getStatus()); + assertEquals( + "Conversion to incremental refresh index cannot proceed due to missing" + + " attributes: checkpoint_location, watermark_delay.", + asyncQueryExecutionResponse.getError()); + emrsClient.startJobRunCalled(0); + emrsClient.cancelJobRunCalled(0); + flintIndexJob.assertState(FlintIndexState.ACTIVE); + Map mappings = mockDS.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("true", options.get("auto_refresh")); + }); + } + + @Test + public void testAlterIndexQueryOfIncrementalRefresh() { + MockFlintIndex ALTER_MV = + new MockFlintIndex( + client, + "flint_my_glue_mydb_mv", + FlintIndexType.MATERIALIZED_VIEW, + "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false," + + " incremental_refresh=true) "); + ImmutableList.of(ALTER_MV) + .forEach( + mockDS -> { + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + super.getJobRunResult(applicationId, jobId); + JobRun jobRun = new JobRun(); + jobRun.setState("cancelled"); + return new GetJobRunResult().withJobRun(jobRun); + } + }; + EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrServerlessClientFactory); + // Mock flint index + mockDS.createIndex(); + HashMap existingOptions = new HashMap<>(); + existingOptions.put("auto_refresh", "true"); + existingOptions.put("incremental_refresh", "false"); + existingOptions.put("watermark_delay", "watermark_delay"); + existingOptions.put("checkpoint_location", "s3://checkpoint/location"); + mockDS.updateIndexOptions(existingOptions, true); + // Mock index state + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); + flintIndexJob.refreshing(); + + // 1. alter index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); + + // 2. fetch result + AsyncQueryExecutionResponse asyncQueryExecutionResponse = + asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); + assertEquals("SUCCESS", asyncQueryExecutionResponse.getStatus()); + emrsClient.startJobRunCalled(0); + emrsClient.getJobRunResultCalled(1); + emrsClient.cancelJobRunCalled(1); + flintIndexJob.assertState(FlintIndexState.ACTIVE); + Map mappings = mockDS.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("false", options.get("auto_refresh")); + Assertions.assertEquals("true", options.get("incremental_refresh")); + }); + } + + @Test + public void testAlterIndexQueryWithIncrementalRefreshAlreadyExisting() { + MockFlintIndex ALTER_MV = + new MockFlintIndex( + client, + "flint_my_glue_mydb_mv", + FlintIndexType.MATERIALIZED_VIEW, + "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false) "); + ImmutableList.of(ALTER_MV) + .forEach( + mockDS -> { + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + super.getJobRunResult(applicationId, jobId); + JobRun jobRun = new JobRun(); + jobRun.setState("cancelled"); + return new GetJobRunResult().withJobRun(jobRun); + } + }; + EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrServerlessClientFactory); + // Mock flint index + mockDS.createIndex(); + HashMap existingOptions = new HashMap<>(); + existingOptions.put("auto_refresh", "true"); + existingOptions.put("incremental_refresh", "true"); + existingOptions.put("watermark_delay", "watermark_delay"); + existingOptions.put("checkpoint_location", "s3://checkpoint/location"); + mockDS.updateIndexOptions(existingOptions, true); + // Mock index state + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); + flintIndexJob.refreshing(); + + // 1. alter index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); + + // 2. fetch result + AsyncQueryExecutionResponse asyncQueryExecutionResponse = + asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); + assertEquals("SUCCESS", asyncQueryExecutionResponse.getStatus()); + emrsClient.startJobRunCalled(0); + emrsClient.getJobRunResultCalled(1); + emrsClient.cancelJobRunCalled(1); + flintIndexJob.assertState(FlintIndexState.ACTIVE); + Map mappings = mockDS.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("false", options.get("auto_refresh")); + Assertions.assertEquals("true", options.get("incremental_refresh")); + }); + } + + @Test + public void testAlterIndexQueryWithInvalidInitialState() { + MockFlintIndex ALTER_SKIPPING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_skipping_index", + FlintIndexType.SKIPPING, + "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false," + + " incremental_refresh=false)"); + ImmutableList.of(ALTER_SKIPPING) + .forEach( + mockDS -> { + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + super.getJobRunResult(applicationId, jobId); + JobRun jobRun = new JobRun(); + jobRun.setState("cancelled"); + return new GetJobRunResult().withJobRun(jobRun); + } + }; + EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrServerlessClientFactory); + // Mock flint index + mockDS.createIndex(); + HashMap existingOptions = new HashMap<>(); + existingOptions.put("auto_refresh", "true"); + mockDS.updateIndexOptions(existingOptions, false); + // Mock index state + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); + flintIndexJob.updating(); + + // 1. alter index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); + + // 2. fetch result + AsyncQueryExecutionResponse asyncQueryExecutionResponse = + asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); + assertEquals("FAILED", asyncQueryExecutionResponse.getStatus()); + assertEquals( + "Transaction failed as flint index is not in a valid state.", + asyncQueryExecutionResponse.getError()); + emrsClient.startJobRunCalled(0); + emrsClient.cancelJobRunCalled(0); + flintIndexJob.assertState(FlintIndexState.UPDATING); + Map mappings = mockDS.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("true", options.get("auto_refresh")); + }); + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java index ff262c24c0..d5084722fe 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java @@ -9,8 +9,6 @@ import com.amazonaws.services.emrserverless.model.GetJobRunResult; import com.amazonaws.services.emrserverless.model.JobRun; import com.google.common.collect.ImmutableList; -import java.util.HashMap; -import java.util.Map; import org.junit.Assert; import org.junit.Test; import org.junit.jupiter.api.Assertions; @@ -18,7 +16,6 @@ import org.opensearch.sql.spark.asyncquery.model.MockFlintIndex; import org.opensearch.sql.spark.asyncquery.model.MockFlintSparkJob; import org.opensearch.sql.spark.client.EMRServerlessClientFactory; -import org.opensearch.sql.spark.client.StartJobRequest; import org.opensearch.sql.spark.flint.FlintIndexState; import org.opensearch.sql.spark.flint.FlintIndexType; import org.opensearch.sql.spark.leasemanager.ConcurrencyLimitExceededException; @@ -1003,861 +1000,4 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { IllegalStateException.class, () -> asyncQueryExecutorService.cancelQuery(response.getQueryId())); } - - @Test - public void testAlterIndexQueryConvertingToManualRefresh() { - MockFlintIndex ALTER_SKIPPING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_skipping_index", - FlintIndexType.SKIPPING, - "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false," - + " incremental_refresh=false)"); - MockFlintIndex ALTER_COVERING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_covering_index", - FlintIndexType.COVERING, - "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (auto_refresh=false," - + " incremental_refresh=false)"); - MockFlintIndex ALTER_MV = - new MockFlintIndex( - client, - "flint_my_glue_mydb_mv", - FlintIndexType.MATERIALIZED_VIEW, - "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false," - + " incremental_refresh=false) "); - ImmutableList.of(ALTER_SKIPPING, ALTER_COVERING, ALTER_MV) - .forEach( - mockDS -> { - LocalEMRSClient emrsClient = - new LocalEMRSClient() { - @Override - public GetJobRunResult getJobRunResult(String applicationId, String jobId) { - super.getJobRunResult(applicationId, jobId); - JobRun jobRun = new JobRun(); - jobRun.setState("cancelled"); - return new GetJobRunResult().withJobRun(jobRun); - } - }; - EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; - AsyncQueryExecutorService asyncQueryExecutorService = - createAsyncQueryExecutorService(emrServerlessClientFactory); - // Mock flint index - mockDS.createIndex(); - HashMap existingOptions = new HashMap<>(); - existingOptions.put("auto_refresh", "true"); - mockDS.updateIndexOptions(existingOptions, false); - // Mock index state - MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); - flintIndexJob.active(); - - // 1. alter index - CreateAsyncQueryResponse response = - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest( - mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); - - // 2. fetch result - AsyncQueryExecutionResponse asyncQueryExecutionResponse = - asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); - assertEquals("SUCCESS", asyncQueryExecutionResponse.getStatus()); - emrsClient.startJobRunCalled(0); - emrsClient.cancelJobRunCalled(1); - flintIndexJob.assertState(FlintIndexState.ACTIVE); - Map mappings = mockDS.getIndexMappings(); - Map meta = (HashMap) mappings.get("_meta"); - Map options = (Map) meta.get("options"); - Assertions.assertEquals("false", options.get("auto_refresh")); - }); - } - - @Test - public void testAlterIndexQueryConvertingToManualRefreshWithNoIncrementalRefresh() { - MockFlintIndex ALTER_SKIPPING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_skipping_index", - FlintIndexType.SKIPPING, - "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false)"); - MockFlintIndex ALTER_COVERING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_covering_index", - FlintIndexType.COVERING, - "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (auto_refresh=false)"); - MockFlintIndex ALTER_MV = - new MockFlintIndex( - client, - "flint_my_glue_mydb_mv", - FlintIndexType.MATERIALIZED_VIEW, - "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false)"); - ImmutableList.of(ALTER_SKIPPING, ALTER_COVERING, ALTER_MV) - .forEach( - mockDS -> { - LocalEMRSClient emrsClient = - new LocalEMRSClient() { - @Override - public GetJobRunResult getJobRunResult(String applicationId, String jobId) { - super.getJobRunResult(applicationId, jobId); - JobRun jobRun = new JobRun(); - jobRun.setState("cancelled"); - return new GetJobRunResult().withJobRun(jobRun); - } - }; - EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; - AsyncQueryExecutorService asyncQueryExecutorService = - createAsyncQueryExecutorService(emrServerlessClientFactory); - // Mock flint index - mockDS.createIndex(); - HashMap existingOptions = new HashMap<>(); - existingOptions.put("auto_refresh", "true"); - existingOptions.put("checkpoint_location", "s3://checkpoint/location"); - mockDS.updateIndexOptions(existingOptions, true); - // Mock index state - MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); - flintIndexJob.active(); - - // 1. alter index - CreateAsyncQueryResponse response = - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest( - mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); - - // 2. fetch result - AsyncQueryExecutionResponse asyncQueryExecutionResponse = - asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); - assertEquals("SUCCESS", asyncQueryExecutionResponse.getStatus()); - emrsClient.startJobRunCalled(0); - emrsClient.cancelJobRunCalled(1); - flintIndexJob.assertState(FlintIndexState.ACTIVE); - Map mappings = mockDS.getIndexMappings(); - Map meta = (HashMap) mappings.get("_meta"); - Map options = (Map) meta.get("options"); - Assertions.assertEquals("false", options.get("auto_refresh")); - }); - } - - @Test - public void testAlterIndexQueryWithRedundantOperation() { - MockFlintIndex ALTER_SKIPPING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_skipping_index", - FlintIndexType.SKIPPING, - "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false," - + " incremental_refresh=false)"); - MockFlintIndex ALTER_COVERING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_covering_index", - FlintIndexType.COVERING, - "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (auto_refresh=false," - + " incremental_refresh=false)"); - MockFlintIndex ALTER_MV = - new MockFlintIndex( - client, - "flint_my_glue_mydb_mv", - FlintIndexType.MATERIALIZED_VIEW, - "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false," - + " incremental_refresh=false) "); - ImmutableList.of(ALTER_SKIPPING, ALTER_COVERING, ALTER_MV) - .forEach( - mockDS -> { - LocalEMRSClient emrsClient = - new LocalEMRSClient() { - @Override - public String startJobRun(StartJobRequest startJobRequest) { - return "jobId"; - } - - @Override - public GetJobRunResult getJobRunResult(String applicationId, String jobId) { - JobRun jobRun = new JobRun(); - jobRun.setState("cancelled"); - return new GetJobRunResult().withJobRun(jobRun); - } - - @Override - public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { - super.cancelJobRun(applicationId, jobId); - throw new IllegalArgumentException("JobId doesn't exist"); - } - }; - EMRServerlessClientFactory emrServerlessCientFactory = () -> emrsClient; - AsyncQueryExecutorService asyncQueryExecutorService = - createAsyncQueryExecutorService(emrServerlessCientFactory); - // Mock flint index - mockDS.createIndex(); - HashMap existingOptions = new HashMap<>(); - existingOptions.put("auto_refresh", "false"); - mockDS.updateIndexOptions(existingOptions, false); - // Mock index state - MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); - flintIndexJob.active(); - - // 1. alter index - CreateAsyncQueryResponse response = - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest( - mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); - - // 2. fetch result - AsyncQueryExecutionResponse asyncQueryExecutionResponse = - asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); - assertEquals("SUCCESS", asyncQueryExecutionResponse.getStatus()); - emrsClient.startJobRunCalled(0); - emrsClient.cancelJobRunCalled(1); - emrsClient.getJobRunResultCalled(0); - flintIndexJob.assertState(FlintIndexState.ACTIVE); - Map mappings = mockDS.getIndexMappings(); - Map meta = (HashMap) mappings.get("_meta"); - Map options = (Map) meta.get("options"); - Assertions.assertEquals("false", options.get("auto_refresh")); - }); - } - - @Test - public void testAlterIndexQueryConvertingToAutoRefresh() { - MockFlintIndex ALTER_SKIPPING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_skipping_index", - FlintIndexType.SKIPPING, - "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=true," - + " incremental_refresh=false)"); - MockFlintIndex ALTER_COVERING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_covering_index", - FlintIndexType.COVERING, - "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (auto_refresh=true," - + " incremental_refresh=false)"); - MockFlintIndex ALTER_MV = - new MockFlintIndex( - client, - "flint_my_glue_mydb_mv", - FlintIndexType.MATERIALIZED_VIEW, - "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=true," - + " incremental_refresh=false) "); - ImmutableList.of(ALTER_SKIPPING, ALTER_COVERING, ALTER_MV) - .forEach( - mockDS -> { - LocalEMRSClient localEMRSClient = new LocalEMRSClient(); - EMRServerlessClientFactory clientFactory = () -> localEMRSClient; - AsyncQueryExecutorService asyncQueryExecutorService = - createAsyncQueryExecutorService(clientFactory); - - // Mock flint index - mockDS.createIndex(); - HashMap existingOptions = new HashMap<>(); - existingOptions.put("auto_refresh", "false"); - mockDS.updateIndexOptions(existingOptions, false); - // Mock index state - MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); - flintIndexJob.active(); - - // 1. alter index - CreateAsyncQueryResponse response = - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest( - mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); - - // 2. fetch result - assertEquals( - "RUNNING", - asyncQueryExecutorService - .getAsyncQueryResults(response.getQueryId()) - .getStatus()); - - flintIndexJob.assertState(FlintIndexState.ACTIVE); - localEMRSClient.startJobRunCalled(1); - localEMRSClient.getJobRunResultCalled(1); - localEMRSClient.cancelJobRunCalled(0); - Map mappings = mockDS.getIndexMappings(); - Map meta = (HashMap) mappings.get("_meta"); - Map options = (Map) meta.get("options"); - Assertions.assertEquals("false", options.get("auto_refresh")); - }); - } - - @Test - public void testAlterIndexQueryWithOutAnyAutoRefresh() { - MockFlintIndex ALTER_SKIPPING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_skipping_index", - FlintIndexType.SKIPPING, - "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (" - + " incremental_refresh=false)"); - MockFlintIndex ALTER_COVERING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_covering_index", - FlintIndexType.COVERING, - "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (" - + " incremental_refresh=false)"); - MockFlintIndex ALTER_MV = - new MockFlintIndex( - client, - "flint_my_glue_mydb_mv", - FlintIndexType.MATERIALIZED_VIEW, - "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (" + " incremental_refresh=false) "); - ImmutableList.of(ALTER_SKIPPING, ALTER_COVERING, ALTER_MV) - .forEach( - mockDS -> { - LocalEMRSClient localEMRSClient = new LocalEMRSClient(); - EMRServerlessClientFactory clientFactory = () -> localEMRSClient; - AsyncQueryExecutorService asyncQueryExecutorService = - createAsyncQueryExecutorService(clientFactory); - - // Mock flint index - mockDS.createIndex(); - HashMap existingOptions = new HashMap<>(); - existingOptions.put("auto_refresh", "false"); - mockDS.updateIndexOptions(existingOptions, false); - // Mock index state - MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); - flintIndexJob.active(); - - // 1. alter index - CreateAsyncQueryResponse response = - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest( - mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); - - // 2. fetch result - assertEquals( - "RUNNING", - asyncQueryExecutorService - .getAsyncQueryResults(response.getQueryId()) - .getStatus()); - - flintIndexJob.assertState(FlintIndexState.ACTIVE); - localEMRSClient.startJobRunCalled(1); - localEMRSClient.getJobRunResultCalled(1); - localEMRSClient.cancelJobRunCalled(0); - Map mappings = mockDS.getIndexMappings(); - Map meta = (HashMap) mappings.get("_meta"); - Map options = (Map) meta.get("options"); - Assertions.assertEquals("false", options.get("auto_refresh")); - }); - } - - @Test - public void testAlterIndexQueryOfFullRefreshWithInvalidOptions() { - MockFlintIndex ALTER_SKIPPING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_skipping_index", - FlintIndexType.SKIPPING, - "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false," - + " incremental_refresh=false, checkpoint_location=\"s3://ckp/skp\")"); - MockFlintIndex ALTER_COVERING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_covering_index", - FlintIndexType.COVERING, - "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (auto_refresh=false," - + " incremental_refresh=false, checkpoint_location=\"s3://ckp/skp\")"); - MockFlintIndex ALTER_MV = - new MockFlintIndex( - client, - "flint_my_glue_mydb_mv", - FlintIndexType.MATERIALIZED_VIEW, - "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false," - + " incremental_refresh=false, checkpoint_location=\"s3://ckp/skp\") "); - ImmutableList.of(ALTER_SKIPPING, ALTER_COVERING, ALTER_MV) - .forEach( - mockDS -> { - LocalEMRSClient emrsClient = - new LocalEMRSClient() { - @Override - public GetJobRunResult getJobRunResult(String applicationId, String jobId) { - super.getJobRunResult(applicationId, jobId); - JobRun jobRun = new JobRun(); - jobRun.setState("cancelled"); - return new GetJobRunResult().withJobRun(jobRun); - } - }; - EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; - AsyncQueryExecutorService asyncQueryExecutorService = - createAsyncQueryExecutorService(emrServerlessClientFactory); - // Mock flint index - mockDS.createIndex(); - HashMap existingOptions = new HashMap<>(); - existingOptions.put("auto_refresh", "true"); - mockDS.updateIndexOptions(existingOptions, false); - // Mock index state - MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); - flintIndexJob.active(); - - // 1. alter index - CreateAsyncQueryResponse response = - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest( - mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); - - // 2. fetch result - AsyncQueryExecutionResponse asyncQueryExecutionResponse = - asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); - assertEquals("FAILED", asyncQueryExecutionResponse.getStatus()); - assertEquals( - "Altering to full refresh only allows: [auto_refresh, incremental_refresh]" - + " options", - asyncQueryExecutionResponse.getError()); - emrsClient.startJobRunCalled(0); - emrsClient.cancelJobRunCalled(0); - flintIndexJob.assertState(FlintIndexState.ACTIVE); - Map mappings = mockDS.getIndexMappings(); - Map meta = (HashMap) mappings.get("_meta"); - Map options = (Map) meta.get("options"); - Assertions.assertEquals("true", options.get("auto_refresh")); - }); - } - - @Test - public void testAlterIndexQueryOfIncrementalRefreshWithInvalidOptions() { - MockFlintIndex ALTER_SKIPPING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_skipping_index", - FlintIndexType.SKIPPING, - "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false," - + " incremental_refresh=true, output_mode=\"complete\")"); - MockFlintIndex ALTER_COVERING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_covering_index", - FlintIndexType.COVERING, - "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (auto_refresh=false," - + " incremental_refresh=true, output_mode=\"complete\")"); - MockFlintIndex ALTER_MV = - new MockFlintIndex( - client, - "flint_my_glue_mydb_mv", - FlintIndexType.MATERIALIZED_VIEW, - "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false," - + " incremental_refresh=true, output_mode=\"complete\") "); - ImmutableList.of(ALTER_SKIPPING, ALTER_COVERING, ALTER_MV) - .forEach( - mockDS -> { - LocalEMRSClient emrsClient = - new LocalEMRSClient() { - @Override - public GetJobRunResult getJobRunResult(String applicationId, String jobId) { - super.getJobRunResult(applicationId, jobId); - JobRun jobRun = new JobRun(); - jobRun.setState("cancelled"); - return new GetJobRunResult().withJobRun(jobRun); - } - }; - EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; - AsyncQueryExecutorService asyncQueryExecutorService = - createAsyncQueryExecutorService(emrServerlessClientFactory); - // Mock flint index - mockDS.createIndex(); - HashMap existingOptions = new HashMap<>(); - existingOptions.put("auto_refresh", "true"); - mockDS.updateIndexOptions(existingOptions, false); - // Mock index state - MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); - flintIndexJob.active(); - - // 1. alter index - CreateAsyncQueryResponse response = - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest( - mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); - - // 2. fetch result - AsyncQueryExecutionResponse asyncQueryExecutionResponse = - asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); - assertEquals("FAILED", asyncQueryExecutionResponse.getStatus()); - assertEquals( - "Altering to incremental refresh only allows: [auto_refresh, incremental_refresh," - + " watermark_delay, checkpoint_location] options", - asyncQueryExecutionResponse.getError()); - emrsClient.startJobRunCalled(0); - emrsClient.cancelJobRunCalled(0); - flintIndexJob.assertState(FlintIndexState.ACTIVE); - Map mappings = mockDS.getIndexMappings(); - Map meta = (HashMap) mappings.get("_meta"); - Map options = (Map) meta.get("options"); - Assertions.assertEquals("true", options.get("auto_refresh")); - }); - } - - @Test - public void testAlterIndexQueryOfIncrementalRefreshWithInsufficientOptions() { - MockFlintIndex ALTER_SKIPPING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_skipping_index", - FlintIndexType.SKIPPING, - "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false," - + " incremental_refresh=true)"); - MockFlintIndex ALTER_COVERING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_covering_index", - FlintIndexType.COVERING, - "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (auto_refresh=false," - + " incremental_refresh=true)"); - ImmutableList.of(ALTER_SKIPPING, ALTER_COVERING) - .forEach( - mockDS -> { - LocalEMRSClient emrsClient = - new LocalEMRSClient() { - @Override - public GetJobRunResult getJobRunResult(String applicationId, String jobId) { - super.getJobRunResult(applicationId, jobId); - JobRun jobRun = new JobRun(); - jobRun.setState("cancelled"); - return new GetJobRunResult().withJobRun(jobRun); - } - }; - EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; - AsyncQueryExecutorService asyncQueryExecutorService = - createAsyncQueryExecutorService(emrServerlessClientFactory); - // Mock flint index - mockDS.createIndex(); - HashMap existingOptions = new HashMap<>(); - existingOptions.put("auto_refresh", "true"); - existingOptions.put("incremental_refresh", "false"); - mockDS.updateIndexOptions(existingOptions, true); - // Mock index state - MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); - flintIndexJob.active(); - - // 1. alter index - CreateAsyncQueryResponse response = - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest( - mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); - - // 2. fetch result - AsyncQueryExecutionResponse asyncQueryExecutionResponse = - asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); - assertEquals("FAILED", asyncQueryExecutionResponse.getStatus()); - assertEquals( - "Conversion to incremental refresh index cannot proceed due to missing" - + " attributes: checkpoint_location.", - asyncQueryExecutionResponse.getError()); - emrsClient.startJobRunCalled(0); - emrsClient.cancelJobRunCalled(0); - flintIndexJob.assertState(FlintIndexState.ACTIVE); - Map mappings = mockDS.getIndexMappings(); - Map meta = (HashMap) mappings.get("_meta"); - Map options = (Map) meta.get("options"); - Assertions.assertEquals("true", options.get("auto_refresh")); - }); - } - - @Test - public void testAlterIndexQueryOfIncrementalRefreshWithInsufficientOptionsForMV() { - MockFlintIndex ALTER_MV = - new MockFlintIndex( - client, - "flint_my_glue_mydb_mv", - FlintIndexType.MATERIALIZED_VIEW, - "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false," - + " incremental_refresh=true) "); - ImmutableList.of(ALTER_MV) - .forEach( - mockDS -> { - LocalEMRSClient emrsClient = - new LocalEMRSClient() { - @Override - public GetJobRunResult getJobRunResult(String applicationId, String jobId) { - super.getJobRunResult(applicationId, jobId); - JobRun jobRun = new JobRun(); - jobRun.setState("cancelled"); - return new GetJobRunResult().withJobRun(jobRun); - } - }; - EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; - AsyncQueryExecutorService asyncQueryExecutorService = - createAsyncQueryExecutorService(emrServerlessClientFactory); - // Mock flint index - mockDS.createIndex(); - HashMap existingOptions = new HashMap<>(); - existingOptions.put("auto_refresh", "true"); - existingOptions.put("incremental_refresh", "false"); - mockDS.updateIndexOptions(existingOptions, true); - // Mock index state - MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); - flintIndexJob.active(); - - // 1. alter index - CreateAsyncQueryResponse response = - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest( - mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); - - // 2. fetch result - AsyncQueryExecutionResponse asyncQueryExecutionResponse = - asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); - assertEquals("FAILED", asyncQueryExecutionResponse.getStatus()); - assertEquals( - "Conversion to incremental refresh index cannot proceed due to missing" - + " attributes: checkpoint_location, watermark_delay.", - asyncQueryExecutionResponse.getError()); - emrsClient.startJobRunCalled(0); - emrsClient.cancelJobRunCalled(0); - flintIndexJob.assertState(FlintIndexState.ACTIVE); - Map mappings = mockDS.getIndexMappings(); - Map meta = (HashMap) mappings.get("_meta"); - Map options = (Map) meta.get("options"); - Assertions.assertEquals("true", options.get("auto_refresh")); - }); - } - - @Test - public void testAlterIndexQueryOfIncrementalRefreshWithEmptyExistingOptionsForMV() { - MockFlintIndex ALTER_MV = - new MockFlintIndex( - client, - "flint_my_glue_mydb_mv", - FlintIndexType.MATERIALIZED_VIEW, - "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false," - + " incremental_refresh=true) "); - ImmutableList.of(ALTER_MV) - .forEach( - mockDS -> { - LocalEMRSClient emrsClient = - new LocalEMRSClient() { - @Override - public GetJobRunResult getJobRunResult(String applicationId, String jobId) { - super.getJobRunResult(applicationId, jobId); - JobRun jobRun = new JobRun(); - jobRun.setState("cancelled"); - return new GetJobRunResult().withJobRun(jobRun); - } - }; - EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; - AsyncQueryExecutorService asyncQueryExecutorService = - createAsyncQueryExecutorService(emrServerlessClientFactory); - // Mock flint index - mockDS.createIndex(); - HashMap existingOptions = new HashMap<>(); - existingOptions.put("auto_refresh", "true"); - existingOptions.put("incremental_refresh", "false"); - existingOptions.put("watermark_delay", ""); - existingOptions.put("checkpoint_location", ""); - mockDS.updateIndexOptions(existingOptions, true); - // Mock index state - MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); - flintIndexJob.active(); - - // 1. alter index - CreateAsyncQueryResponse response = - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest( - mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); - - // 2. fetch result - AsyncQueryExecutionResponse asyncQueryExecutionResponse = - asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); - assertEquals("FAILED", asyncQueryExecutionResponse.getStatus()); - assertEquals( - "Conversion to incremental refresh index cannot proceed due to missing" - + " attributes: checkpoint_location, watermark_delay.", - asyncQueryExecutionResponse.getError()); - emrsClient.startJobRunCalled(0); - emrsClient.cancelJobRunCalled(0); - flintIndexJob.assertState(FlintIndexState.ACTIVE); - Map mappings = mockDS.getIndexMappings(); - Map meta = (HashMap) mappings.get("_meta"); - Map options = (Map) meta.get("options"); - Assertions.assertEquals("true", options.get("auto_refresh")); - }); - } - - @Test - public void testAlterIndexQueryOfIncrementalRefresh() { - MockFlintIndex ALTER_MV = - new MockFlintIndex( - client, - "flint_my_glue_mydb_mv", - FlintIndexType.MATERIALIZED_VIEW, - "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false," - + " incremental_refresh=true) "); - ImmutableList.of(ALTER_MV) - .forEach( - mockDS -> { - LocalEMRSClient emrsClient = - new LocalEMRSClient() { - @Override - public GetJobRunResult getJobRunResult(String applicationId, String jobId) { - super.getJobRunResult(applicationId, jobId); - JobRun jobRun = new JobRun(); - jobRun.setState("cancelled"); - return new GetJobRunResult().withJobRun(jobRun); - } - }; - EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; - AsyncQueryExecutorService asyncQueryExecutorService = - createAsyncQueryExecutorService(emrServerlessClientFactory); - // Mock flint index - mockDS.createIndex(); - HashMap existingOptions = new HashMap<>(); - existingOptions.put("auto_refresh", "true"); - existingOptions.put("incremental_refresh", "false"); - existingOptions.put("watermark_delay", "watermark_delay"); - existingOptions.put("checkpoint_location", "s3://checkpoint/location"); - mockDS.updateIndexOptions(existingOptions, true); - // Mock index state - MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); - flintIndexJob.refreshing(); - - // 1. alter index - CreateAsyncQueryResponse response = - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest( - mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); - - // 2. fetch result - AsyncQueryExecutionResponse asyncQueryExecutionResponse = - asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); - assertEquals("SUCCESS", asyncQueryExecutionResponse.getStatus()); - emrsClient.startJobRunCalled(0); - emrsClient.getJobRunResultCalled(1); - emrsClient.cancelJobRunCalled(1); - flintIndexJob.assertState(FlintIndexState.ACTIVE); - Map mappings = mockDS.getIndexMappings(); - Map meta = (HashMap) mappings.get("_meta"); - Map options = (Map) meta.get("options"); - Assertions.assertEquals("false", options.get("auto_refresh")); - Assertions.assertEquals("true", options.get("incremental_refresh")); - }); - } - - @Test - public void testAlterIndexQueryWithIncrementalRefreshAlreadyExisting() { - MockFlintIndex ALTER_MV = - new MockFlintIndex( - client, - "flint_my_glue_mydb_mv", - FlintIndexType.MATERIALIZED_VIEW, - "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false) "); - ImmutableList.of(ALTER_MV) - .forEach( - mockDS -> { - LocalEMRSClient emrsClient = - new LocalEMRSClient() { - @Override - public GetJobRunResult getJobRunResult(String applicationId, String jobId) { - super.getJobRunResult(applicationId, jobId); - JobRun jobRun = new JobRun(); - jobRun.setState("cancelled"); - return new GetJobRunResult().withJobRun(jobRun); - } - }; - EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; - AsyncQueryExecutorService asyncQueryExecutorService = - createAsyncQueryExecutorService(emrServerlessClientFactory); - // Mock flint index - mockDS.createIndex(); - HashMap existingOptions = new HashMap<>(); - existingOptions.put("auto_refresh", "true"); - existingOptions.put("incremental_refresh", "true"); - existingOptions.put("watermark_delay", "watermark_delay"); - existingOptions.put("checkpoint_location", "s3://checkpoint/location"); - mockDS.updateIndexOptions(existingOptions, true); - // Mock index state - MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); - flintIndexJob.refreshing(); - - // 1. alter index - CreateAsyncQueryResponse response = - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest( - mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); - - // 2. fetch result - AsyncQueryExecutionResponse asyncQueryExecutionResponse = - asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); - assertEquals("SUCCESS", asyncQueryExecutionResponse.getStatus()); - emrsClient.startJobRunCalled(0); - emrsClient.getJobRunResultCalled(1); - emrsClient.cancelJobRunCalled(1); - flintIndexJob.assertState(FlintIndexState.ACTIVE); - Map mappings = mockDS.getIndexMappings(); - Map meta = (HashMap) mappings.get("_meta"); - Map options = (Map) meta.get("options"); - Assertions.assertEquals("false", options.get("auto_refresh")); - Assertions.assertEquals("true", options.get("incremental_refresh")); - }); - } - - @Test - public void testAlterIndexQueryWithInvalidInitialState() { - MockFlintIndex ALTER_SKIPPING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_skipping_index", - FlintIndexType.SKIPPING, - "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false," - + " incremental_refresh=false)"); - ImmutableList.of(ALTER_SKIPPING) - .forEach( - mockDS -> { - LocalEMRSClient emrsClient = - new LocalEMRSClient() { - @Override - public GetJobRunResult getJobRunResult(String applicationId, String jobId) { - super.getJobRunResult(applicationId, jobId); - JobRun jobRun = new JobRun(); - jobRun.setState("cancelled"); - return new GetJobRunResult().withJobRun(jobRun); - } - }; - EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; - AsyncQueryExecutorService asyncQueryExecutorService = - createAsyncQueryExecutorService(emrServerlessClientFactory); - // Mock flint index - mockDS.createIndex(); - HashMap existingOptions = new HashMap<>(); - existingOptions.put("auto_refresh", "true"); - mockDS.updateIndexOptions(existingOptions, false); - // Mock index state - MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); - flintIndexJob.updating(); - - // 1. alter index - CreateAsyncQueryResponse response = - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest( - mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); - - // 2. fetch result - AsyncQueryExecutionResponse asyncQueryExecutionResponse = - asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); - assertEquals("FAILED", asyncQueryExecutionResponse.getStatus()); - assertEquals( - "Transaction failed as flint index is not in a valid state.", - asyncQueryExecutionResponse.getError()); - emrsClient.startJobRunCalled(0); - emrsClient.cancelJobRunCalled(0); - flintIndexJob.assertState(FlintIndexState.UPDATING); - Map mappings = mockDS.getIndexMappings(); - Map meta = (HashMap) mappings.get("_meta"); - Map options = (Map) meta.get("options"); - Assertions.assertEquals("true", options.get("auto_refresh")); - }); - } } diff --git a/spark/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java b/spark/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java index a5123e0174..05901f5384 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java @@ -183,6 +183,29 @@ void testCancelJobRunWithValidationException() { Assertions.assertEquals("Internal Server Error.", runtimeException.getMessage()); } + @Test + void testCancelJobRunWithNativeEMRExceptionWithValidationException() { + doThrow(new ValidationException("Error")).when(emrServerless).cancelJobRun(any()); + EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless); + ValidationException validationException = + Assertions.assertThrows( + ValidationException.class, + () -> + emrServerlessClient.cancelJobRunWithNativeEMRException( + EMRS_APPLICATION_ID, EMR_JOB_ID)); + Assertions.assertTrue(validationException.getMessage().contains("Error")); + } + + @Test + void testCancelJobRunWithNativeEMRException() { + when(emrServerless.cancelJobRun(any())) + .thenReturn(new CancelJobRunResult().withJobRunId(EMR_JOB_ID)); + EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless); + CancelJobRunResult cancelJobRunResult = + emrServerlessClient.cancelJobRunWithNativeEMRException(EMRS_APPLICATION_ID, EMR_JOB_ID); + Assertions.assertEquals(EMR_JOB_ID, cancelJobRunResult.getJobRunId()); + } + @Test void testStartJobRunWithLongJobName() { StartJobRunResult response = new StartJobRunResult(); diff --git a/spark/src/test/java/org/opensearch/sql/spark/execution/session/InteractiveSessionTest.java b/spark/src/test/java/org/opensearch/sql/spark/execution/session/InteractiveSessionTest.java index 6112261336..aaae1c639a 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/execution/session/InteractiveSessionTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/execution/session/InteractiveSessionTest.java @@ -232,6 +232,12 @@ public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { return null; } + @Override + public CancelJobRunResult cancelJobRunWithNativeEMRException( + String applicationId, String jobId) { + return null; + } + public void startJobRunCalled(int expectedTimes) { assertEquals(expectedTimes, startJobRunCalled); }