diff --git a/common/src/main/java/org/opensearch/sql/common/setting/Settings.java b/common/src/main/java/org/opensearch/sql/common/setting/Settings.java index 2a9231fc25..e2b7ab2904 100644 --- a/common/src/main/java/org/opensearch/sql/common/setting/Settings.java +++ b/common/src/main/java/org/opensearch/sql/common/setting/Settings.java @@ -48,7 +48,9 @@ public enum Key { "plugins.query.executionengine.spark.session_inactivity_timeout_millis"), /** Async query Settings * */ - ASYNC_QUERY_ENABLED("plugins.query.executionengine.async_query.enabled"); + ASYNC_QUERY_ENABLED("plugins.query.executionengine.async_query.enabled"), + STREAMING_JOB_HOUSEKEEPER_INTERVAL( + "plugins.query.executionengine.spark.streamingjobs.housekeeper.interval"); @Getter private final String keyValue; diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorage.java b/datasources/src/main/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorage.java index 6659e54342..eeb0302ed0 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorage.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorage.java @@ -165,6 +165,7 @@ public void updateDataSourceMetadata(DataSourceMetadata dataSourceMetadata) { public void deleteDataSourceMetadata(String datasourceName) { DeleteRequest deleteRequest = new DeleteRequest(DATASOURCE_INDEX_NAME); deleteRequest.id(datasourceName); + deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); ActionFuture deleteResponseActionFuture; try (ThreadContext.StoredContext storedContext = client.threadPool().getThreadContext().stashContext()) { diff --git a/docs/user/admin/settings.rst b/docs/user/admin/settings.rst index c1a7a4eb8b..ce62f7be4c 100644 --- a/docs/user/admin/settings.rst +++ b/docs/user/admin/settings.rst @@ -595,3 +595,38 @@ Request:: } } +plugins.query.executionengine.spark.streamingjobs.housekeeper.interval +=============================== + +Description +----------- +This setting specifies the interval at which the streaming job housekeeper runs to clean up streaming jobs associated with deleted and disabled data sources. +The default configuration executes this cleanup every 15 minutes. + +* Default Value: 15 minutes + +To modify the TTL to 30 minutes for example, use this command: + +Request :: + + sh$ curl -sS -H 'Content-Type: application/json' -X PUT localhost:9200/_cluster/settings \ + ... -d '{"transient":{"plugins.query.executionengine.spark.streamingjobs.housekeeper.interval":"30m"}}' + { + "acknowledged": true, + "persistent": {}, + "transient": { + "plugins": { + "query": { + "executionengine": { + "spark": { + "streamingjobs": { + "housekeeper": { + "interval": "30m" + } + } + } + } + } + } + } + } diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/metrics/MetricName.java b/legacy/src/main/java/org/opensearch/sql/legacy/metrics/MetricName.java index 91ade7b038..72960944b6 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/metrics/MetricName.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/metrics/MetricName.java @@ -47,7 +47,8 @@ public enum MetricName { EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT("emr_cancel_job_request_failure_count"), EMR_STREAMING_QUERY_JOBS_CREATION_COUNT("emr_streaming_jobs_creation_count"), EMR_INTERACTIVE_QUERY_JOBS_CREATION_COUNT("emr_interactive_jobs_creation_count"), - EMR_BATCH_QUERY_JOBS_CREATION_COUNT("emr_batch_jobs_creation_count"); + EMR_BATCH_QUERY_JOBS_CREATION_COUNT("emr_batch_jobs_creation_count"), + STREAMING_JOB_HOUSEKEEPER_TASK_FAILURE_COUNT("streaming_job_housekeeper_task_failure_count"); private String name; @@ -91,6 +92,7 @@ public static List getNames() { .add(ASYNC_QUERY_CREATE_API_REQUEST_COUNT) .add(ASYNC_QUERY_GET_API_REQUEST_COUNT) .add(ASYNC_QUERY_CANCEL_API_REQUEST_COUNT) + .add(STREAMING_JOB_HOUSEKEEPER_TASK_FAILURE_COUNT) .build(); public boolean isNumerical() { diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java index 159b37309e..b03852be53 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java @@ -7,6 +7,7 @@ import static org.opensearch.common.settings.Settings.EMPTY; import static org.opensearch.common.unit.TimeValue.timeValueDays; +import static org.opensearch.common.unit.TimeValue.timeValueMinutes; import static org.opensearch.sql.common.setting.Settings.Key.ENCYRPTION_MASTER_KEY; import com.google.common.annotations.VisibleForTesting; @@ -193,6 +194,13 @@ public class OpenSearchSettings extends Settings { Setting.Property.NodeScope, Setting.Property.Dynamic); + public static final Setting STREAMING_JOB_HOUSEKEEPER_INTERVAL_SETTING = + Setting.positiveTimeSetting( + Key.STREAMING_JOB_HOUSEKEEPER_INTERVAL.getKeyValue(), + timeValueMinutes(15), + Setting.Property.NodeScope, + Setting.Property.Dynamic); + /** Construct OpenSearchSetting. The OpenSearchSetting must be singleton. */ @SuppressWarnings("unchecked") public OpenSearchSettings(ClusterSettings clusterSettings) { @@ -313,6 +321,12 @@ public OpenSearchSettings(ClusterSettings clusterSettings) { Key.SESSION_INACTIVITY_TIMEOUT_MILLIS, SESSION_INACTIVITY_TIMEOUT_MILLIS_SETTING, new Updater((Key.SESSION_INACTIVITY_TIMEOUT_MILLIS))); + register( + settingBuilder, + clusterSettings, + Key.STREAMING_JOB_HOUSEKEEPER_INTERVAL, + STREAMING_JOB_HOUSEKEEPER_INTERVAL_SETTING, + new Updater((Key.STREAMING_JOB_HOUSEKEEPER_INTERVAL))); defaultSettings = settingBuilder.build(); } diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index 2b75a8b2c9..08386b797e 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -79,7 +79,10 @@ import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse; import org.opensearch.sql.prometheus.storage.PrometheusStorageFactory; import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService; +import org.opensearch.sql.spark.client.EMRServerlessClientFactory; import org.opensearch.sql.spark.cluster.ClusterManagerEventListener; +import org.opensearch.sql.spark.execution.statestore.StateStore; +import org.opensearch.sql.spark.flint.FlintIndexMetadataServiceImpl; import org.opensearch.sql.spark.rest.RestAsyncQueryManagementAction; import org.opensearch.sql.spark.storage.SparkStorageFactory; import org.opensearch.sql.spark.transport.TransportCancelAsyncQueryRequestAction; @@ -220,8 +223,13 @@ public Collection createComponents( Clock.systemUTC(), OpenSearchSettings.SESSION_INDEX_TTL_SETTING, OpenSearchSettings.RESULT_INDEX_TTL_SETTING, + OpenSearchSettings.STREAMING_JOB_HOUSEKEEPER_INTERVAL_SETTING, OpenSearchSettings.AUTO_INDEX_MANAGEMENT_ENABLED_SETTING, - environment.settings()); + environment.settings(), + dataSourceService, + injector.getInstance(FlintIndexMetadataServiceImpl.class), + injector.getInstance(StateStore.class), + injector.getInstance(EMRServerlessClientFactory.class)); return ImmutableList.of( dataSourceService, injector.getInstance(AsyncQueryExecutorService.class), diff --git a/spark/src/main/java/org/opensearch/sql/spark/cluster/ClusterManagerEventListener.java b/spark/src/main/java/org/opensearch/sql/spark/cluster/ClusterManagerEventListener.java index 3d004b548f..8f38583b3f 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/cluster/ClusterManagerEventListener.java +++ b/spark/src/main/java/org/opensearch/sql/spark/cluster/ClusterManagerEventListener.java @@ -19,19 +19,29 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.sql.datasource.DataSourceService; import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.spark.client.EMRServerlessClientFactory; +import org.opensearch.sql.spark.execution.statestore.StateStore; +import org.opensearch.sql.spark.flint.FlintIndexMetadataService; import org.opensearch.threadpool.Scheduler.Cancellable; import org.opensearch.threadpool.ThreadPool; public class ClusterManagerEventListener implements LocalNodeClusterManagerListener { private Cancellable flintIndexRetentionCron; + private Cancellable flintStreamingJobHouseKeeperCron; private ClusterService clusterService; private ThreadPool threadPool; private Client client; private Clock clock; + private DataSourceService dataSourceService; + private FlintIndexMetadataService flintIndexMetadataService; + private StateStore stateStore; + private EMRServerlessClientFactory emrServerlessClientFactory; private Duration sessionTtlDuration; private Duration resultTtlDuration; + private TimeValue streamingJobHouseKeepingInterval; private boolean isAutoIndexManagementEnabled; public ClusterManagerEventListener( @@ -41,16 +51,25 @@ public ClusterManagerEventListener( Clock clock, Setting sessionTtl, Setting resultTtl, + Setting streamingJobHouseKeepingInterval, Setting isAutoIndexManagementEnabledSetting, - Settings settings) { + Settings settings, + DataSourceService dataSourceService, + FlintIndexMetadataService flintIndexMetadataService, + StateStore stateStore, + EMRServerlessClientFactory emrServerlessClientFactory) { this.clusterService = clusterService; this.threadPool = threadPool; this.client = client; this.clusterService.addLocalNodeClusterManagerListener(this); this.clock = clock; - + this.dataSourceService = dataSourceService; + this.flintIndexMetadataService = flintIndexMetadataService; + this.stateStore = stateStore; + this.emrServerlessClientFactory = emrServerlessClientFactory; this.sessionTtlDuration = toDuration(sessionTtl.get(settings)); this.resultTtlDuration = toDuration(resultTtl.get(settings)); + this.streamingJobHouseKeepingInterval = streamingJobHouseKeepingInterval.get(settings); clusterService .getClusterSettings() @@ -87,6 +106,16 @@ public ClusterManagerEventListener( } } }); + + clusterService + .getClusterSettings() + .addSettingsUpdateConsumer( + streamingJobHouseKeepingInterval, + it -> { + this.streamingJobHouseKeepingInterval = it; + cancel(flintStreamingJobHouseKeeperCron); + initializeStreamingJobHouseKeeperCron(); + }); } @Override @@ -104,6 +133,19 @@ public void beforeStop() { } }); } + initializeStreamingJobHouseKeeperCron(); + } + + private void initializeStreamingJobHouseKeeperCron() { + flintStreamingJobHouseKeeperCron = + threadPool.scheduleWithFixedDelay( + new FlintStreamingJobHouseKeeperTask( + dataSourceService, + flintIndexMetadataService, + stateStore, + emrServerlessClientFactory), + streamingJobHouseKeepingInterval, + executorName()); } private void reInitializeFlintIndexRetention() { @@ -125,6 +167,8 @@ private void reInitializeFlintIndexRetention() { public void offClusterManager() { cancel(flintIndexRetentionCron); flintIndexRetentionCron = null; + cancel(flintStreamingJobHouseKeeperCron); + flintStreamingJobHouseKeeperCron = null; } private void cancel(Cancellable cron) { diff --git a/spark/src/main/java/org/opensearch/sql/spark/cluster/FlintStreamingJobHouseKeeperTask.java b/spark/src/main/java/org/opensearch/sql/spark/cluster/FlintStreamingJobHouseKeeperTask.java new file mode 100644 index 0000000000..10e6f20cdc --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/cluster/FlintStreamingJobHouseKeeperTask.java @@ -0,0 +1,134 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.cluster; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import lombok.RequiredArgsConstructor; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.sql.datasource.DataSourceService; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.DataSourceStatus; +import org.opensearch.sql.datasources.exceptions.DataSourceNotFoundException; +import org.opensearch.sql.legacy.metrics.MetricName; +import org.opensearch.sql.legacy.metrics.Metrics; +import org.opensearch.sql.spark.client.EMRServerlessClientFactory; +import org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions; +import org.opensearch.sql.spark.execution.statestore.StateStore; +import org.opensearch.sql.spark.flint.FlintIndexMetadata; +import org.opensearch.sql.spark.flint.FlintIndexMetadataService; +import org.opensearch.sql.spark.flint.operation.FlintIndexOpAlter; +import org.opensearch.sql.spark.flint.operation.FlintIndexOpDrop; + +/** Cleaner task which alters the active streaming jobs of a disabled datasource. */ +@RequiredArgsConstructor +public class FlintStreamingJobHouseKeeperTask implements Runnable { + + private final DataSourceService dataSourceService; + private final FlintIndexMetadataService flintIndexMetadataService; + private final StateStore stateStore; + private final EMRServerlessClientFactory emrServerlessClientFactory; + + private static final Logger LOGGER = LogManager.getLogger(FlintStreamingJobHouseKeeperTask.class); + protected static final AtomicBoolean isRunning = new AtomicBoolean(false); + + @Override + public void run() { + if (!isRunning.compareAndSet(false, true)) { + LOGGER.info("Previous task is still running. Skipping this execution."); + return; + } + try { + LOGGER.info("Starting the cleaner task for disabled and deleted data sources."); + Map autoRefreshFlintIndicesMap = getAllAutoRefreshIndices(); + autoRefreshFlintIndicesMap.forEach( + (autoRefreshIndex, flintIndexMetadata) -> { + try { + String datasourceName = getDataSourceName(flintIndexMetadata); + try { + DataSourceMetadata dataSourceMetadata = + this.dataSourceService.getDataSourceMetadata(datasourceName); + if (dataSourceMetadata.getStatus() == DataSourceStatus.DISABLED) { + LOGGER.info("Datasource is disabled for autoRefreshIndex: {}", autoRefreshIndex); + alterAutoRefreshIndex(autoRefreshIndex, flintIndexMetadata, datasourceName); + } else { + LOGGER.debug("Datasource is enabled for autoRefreshIndex : {}", autoRefreshIndex); + } + } catch (DataSourceNotFoundException exception) { + // Datasource disabled. + LOGGER.info("Datasource is deleted for autoRefreshIndex: {}", autoRefreshIndex); + dropAutoRefreshIndex(autoRefreshIndex, flintIndexMetadata, datasourceName); + } + } catch (Exception exception) { + LOGGER.error( + "Failed to alter/cancel index {}: {}", + autoRefreshIndex, + exception.getMessage(), + exception); + Metrics.getInstance() + .getNumericalMetric(MetricName.STREAMING_JOB_HOUSEKEEPER_TASK_FAILURE_COUNT) + .increment(); + } + }); + } catch (Throwable error) { + LOGGER.error("Error while running the streaming job cleaner task: {}", error.getMessage()); + Metrics.getInstance() + .getNumericalMetric(MetricName.STREAMING_JOB_HOUSEKEEPER_TASK_FAILURE_COUNT) + .increment(); + } finally { + isRunning.set(false); + } + } + + private void dropAutoRefreshIndex( + String autoRefreshIndex, FlintIndexMetadata flintIndexMetadata, String datasourceName) { + // When the datasource is deleted. Possibly Replace with VACUUM Operation. + LOGGER.info("Attempting to drop auto refresh index: {}", autoRefreshIndex); + FlintIndexOpDrop flintIndexOpDrop = + new FlintIndexOpDrop(stateStore, datasourceName, emrServerlessClientFactory.getClient()); + flintIndexOpDrop.apply(flintIndexMetadata); + LOGGER.info("Successfully dropped index: {}", autoRefreshIndex); + } + + private void alterAutoRefreshIndex( + String autoRefreshIndex, FlintIndexMetadata flintIndexMetadata, String datasourceName) { + LOGGER.info("Attempting to alter index: {}", autoRefreshIndex); + FlintIndexOptions flintIndexOptions = new FlintIndexOptions(); + flintIndexOptions.setOption(FlintIndexOptions.AUTO_REFRESH, "false"); + FlintIndexOpAlter flintIndexOpAlter = + new FlintIndexOpAlter( + flintIndexOptions, + stateStore, + datasourceName, + emrServerlessClientFactory.getClient(), + flintIndexMetadataService); + flintIndexOpAlter.apply(flintIndexMetadata); + LOGGER.info("Successfully altered index: {}", autoRefreshIndex); + } + + private String getDataSourceName(FlintIndexMetadata flintIndexMetadata) { + String kind = flintIndexMetadata.getKind(); + switch (kind) { + case "mv": + return flintIndexMetadata.getName().split("\\.")[0]; + case "skipping": + case "covering": + return flintIndexMetadata.getSource().split("\\.")[0]; + default: + throw new IllegalArgumentException(String.format("Unknown flint index kind: %s", kind)); + } + } + + private Map getAllAutoRefreshIndices() { + Map flintIndexMetadataHashMap = + flintIndexMetadataService.getFlintIndexMetadata("flint_*"); + return flintIndexMetadataHashMap.entrySet().stream() + .filter(entry -> entry.getValue().getFlintIndexOptions().autoRefresh()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadata.java b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadata.java index 50ed17beb7..0b00e8390b 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadata.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadata.java @@ -31,6 +31,9 @@ public class FlintIndexMetadata { private final String jobId; private final String appId; private final String latestId; + private final String kind; + private final String source; + private final String name; private final FlintIndexOptions flintIndexOptions; public Optional getLatestId() { diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataServiceImpl.java b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataServiceImpl.java index a70b1db9d2..893b33b39d 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataServiceImpl.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataServiceImpl.java @@ -11,11 +11,14 @@ import static org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions.WATERMARK_DELAY; import static org.opensearch.sql.spark.flint.FlintIndexMetadata.APP_ID; import static org.opensearch.sql.spark.flint.FlintIndexMetadata.ENV_KEY; +import static org.opensearch.sql.spark.flint.FlintIndexMetadata.KIND_KEY; import static org.opensearch.sql.spark.flint.FlintIndexMetadata.LATEST_ID_KEY; import static org.opensearch.sql.spark.flint.FlintIndexMetadata.META_KEY; +import static org.opensearch.sql.spark.flint.FlintIndexMetadata.NAME_KEY; import static org.opensearch.sql.spark.flint.FlintIndexMetadata.OPTIONS_KEY; import static org.opensearch.sql.spark.flint.FlintIndexMetadata.PROPERTIES_KEY; import static org.opensearch.sql.spark.flint.FlintIndexMetadata.SERVERLESS_EMR_JOB_ID; +import static org.opensearch.sql.spark.flint.FlintIndexMetadata.SOURCE_KEY; import java.util.ArrayList; import java.util.Arrays; @@ -149,9 +152,15 @@ private FlintIndexMetadata fromMetadata(String indexName, Map me String jobId = (String) envMap.get(SERVERLESS_EMR_JOB_ID); String appId = (String) envMap.getOrDefault(APP_ID, null); String latestId = (String) metaMap.getOrDefault(LATEST_ID_KEY, null); + String kind = (String) metaMap.getOrDefault(KIND_KEY, null); + String name = (String) metaMap.getOrDefault(NAME_KEY, null); + String source = (String) metaMap.getOrDefault(SOURCE_KEY, null); flintIndexMetadataBuilder.jobId(jobId); flintIndexMetadataBuilder.appId(appId); flintIndexMetadataBuilder.latestId(latestId); + flintIndexMetadataBuilder.name(name); + flintIndexMetadataBuilder.kind(kind); + flintIndexMetadataBuilder.source(source); flintIndexMetadataBuilder.opensearchIndexName(indexName); flintIndexMetadataBuilder.flintIndexOptions(flintIndexOptions); return flintIndexMetadataBuilder.build(); diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java index 6a6d5982b8..f2d3bb1aa8 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java @@ -58,7 +58,7 @@ public void withoutSessionCreateAsyncQueryThenGetResultThenCancel() { // 1. create async query. CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest("select 1", DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest("select 1", MYS3_DATASOURCE, LangType.SQL, null)); assertFalse(clusterService().state().routingTable().hasIndex(SPARK_REQUEST_BUFFER_INDEX_NAME)); emrsClient.startJobRunCalled(1); @@ -88,12 +88,12 @@ public void sessionLimitNotImpactBatchQuery() { // 1. create async query. CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest("select 1", DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest("select 1", MYS3_DATASOURCE, LangType.SQL, null)); emrsClient.startJobRunCalled(1); CreateAsyncQueryResponse resp2 = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest("select 1", DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest("select 1", MYS3_DATASOURCE, LangType.SQL, null)); emrsClient.startJobRunCalled(2); } @@ -107,7 +107,7 @@ public void createAsyncQueryCreateJobWithCorrectParameters() { enableSession(false); CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest("select 1", DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest("select 1", MYS3_DATASOURCE, LangType.SQL, null)); String params = emrsClient.getJobRequest().getSparkSubmitParams(); assertNull(response.getSessionId()); assertTrue(params.contains(String.format("--class %s", DEFAULT_CLASS_NAME))); @@ -121,7 +121,7 @@ public void createAsyncQueryCreateJobWithCorrectParameters() { enableSession(true); response = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest("select 1", DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest("select 1", MYS3_DATASOURCE, LangType.SQL, null)); params = emrsClient.getJobRequest().getSparkSubmitParams(); assertTrue(params.contains(String.format("--class %s", FLINT_SESSION_CLASS_NAME))); assertTrue( @@ -141,10 +141,10 @@ public void withSessionCreateAsyncQueryThenGetResultThenCancel() { // 1. create async query. CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest("select 1", DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest("select 1", MYS3_DATASOURCE, LangType.SQL, null)); assertNotNull(response.getSessionId()); Optional statementModel = - getStatement(stateStore, DATASOURCE).apply(response.getQueryId()); + getStatement(stateStore, MYS3_DATASOURCE).apply(response.getQueryId()); assertTrue(statementModel.isPresent()); assertEquals(StatementState.WAITING, statementModel.get().getStatementState()); @@ -172,14 +172,14 @@ public void reuseSessionWhenCreateAsyncQuery() { // 1. create async query. CreateAsyncQueryResponse first = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest("select 1", DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest("select 1", MYS3_DATASOURCE, LangType.SQL, null)); assertNotNull(first.getSessionId()); // 2. reuse session id CreateAsyncQueryResponse second = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - "select 1", DATASOURCE, LangType.SQL, first.getSessionId())); + "select 1", MYS3_DATASOURCE, LangType.SQL, first.getSessionId())); assertEquals(first.getSessionId(), second.getSessionId()); assertNotEquals(first.getQueryId(), second.getQueryId()); @@ -199,13 +199,13 @@ public void reuseSessionWhenCreateAsyncQuery() { .must(QueryBuilders.termQuery(SESSION_ID, first.getSessionId())))); Optional firstModel = - getStatement(stateStore, DATASOURCE).apply(first.getQueryId()); + getStatement(stateStore, MYS3_DATASOURCE).apply(first.getQueryId()); assertTrue(firstModel.isPresent()); assertEquals(StatementState.WAITING, firstModel.get().getStatementState()); assertEquals(first.getQueryId(), firstModel.get().getStatementId().getId()); assertEquals(first.getQueryId(), firstModel.get().getQueryId()); Optional secondModel = - getStatement(stateStore, DATASOURCE).apply(second.getQueryId()); + getStatement(stateStore, MYS3_DATASOURCE).apply(second.getQueryId()); assertEquals(StatementState.WAITING, secondModel.get().getStatementState()); assertEquals(second.getQueryId(), secondModel.get().getStatementId().getId()); assertEquals(second.getQueryId(), secondModel.get().getQueryId()); @@ -221,7 +221,7 @@ public void batchQueryHasTimeout() { enableSession(false); CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest("select 1", DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest("select 1", MYS3_DATASOURCE, LangType.SQL, null)); assertEquals(120L, (long) emrsClient.getJobRequest().executionTimeout()); } @@ -237,7 +237,7 @@ public void interactiveQueryNoTimeout() { enableSession(true); asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest("select 1", DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest("select 1", MYS3_DATASOURCE, LangType.SQL, null)); assertEquals(0L, (long) emrsClient.getJobRequest().executionTimeout()); } @@ -292,10 +292,10 @@ public void withSessionCreateAsyncQueryFailed() { // 1. create async query. CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest("myselect 1", DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest("myselect 1", MYS3_DATASOURCE, LangType.SQL, null)); assertNotNull(response.getSessionId()); Optional statementModel = - getStatement(stateStore, DATASOURCE).apply(response.getQueryId()); + getStatement(stateStore, MYS3_DATASOURCE).apply(response.getQueryId()); assertTrue(statementModel.isPresent()); assertEquals(StatementState.WAITING, statementModel.get().getStatementState()); @@ -319,7 +319,7 @@ public void withSessionCreateAsyncQueryFailed() { .seqNo(submitted.getSeqNo()) .primaryTerm(submitted.getPrimaryTerm()) .build(); - updateStatementState(stateStore, DATASOURCE).apply(mocked, StatementState.FAILED); + updateStatementState(stateStore, MYS3_DATASOURCE).apply(mocked, StatementState.FAILED); AsyncQueryExecutionResponse asyncQueryResults = asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); @@ -343,7 +343,7 @@ public void createSessionMoreThanLimitFailed() { // 1. create async query. CreateAsyncQueryResponse first = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest("select 1", DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest("select 1", MYS3_DATASOURCE, LangType.SQL, null)); assertNotNull(first.getSessionId()); setSessionState(first.getSessionId(), SessionState.RUNNING); @@ -353,7 +353,7 @@ public void createSessionMoreThanLimitFailed() { ConcurrencyLimitExceededException.class, () -> asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest("select 1", DATASOURCE, LangType.SQL, null))); + new CreateAsyncQueryRequest("select 1", MYS3_DATASOURCE, LangType.SQL, null))); assertEquals("domain concurrent active session can not exceed 1", exception.getMessage()); } @@ -371,7 +371,7 @@ public void recreateSessionIfNotReady() { // 1. create async query. CreateAsyncQueryResponse first = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest("select 1", DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest("select 1", MYS3_DATASOURCE, LangType.SQL, null)); assertNotNull(first.getSessionId()); // set sessionState to FAIL @@ -381,7 +381,7 @@ public void recreateSessionIfNotReady() { CreateAsyncQueryResponse second = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - "select 1", DATASOURCE, LangType.SQL, first.getSessionId())); + "select 1", MYS3_DATASOURCE, LangType.SQL, first.getSessionId())); assertNotEquals(first.getSessionId(), second.getSessionId()); @@ -392,7 +392,7 @@ public void recreateSessionIfNotReady() { CreateAsyncQueryResponse third = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - "select 1", DATASOURCE, LangType.SQL, second.getSessionId())); + "select 1", MYS3_DATASOURCE, LangType.SQL, second.getSessionId())); assertNotEquals(second.getSessionId(), third.getSessionId()); } @@ -410,7 +410,7 @@ public void submitQueryWithDifferentDataSourceSessionWillCreateNewSession() { CreateAsyncQueryResponse first = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - "SHOW SCHEMAS IN " + DATASOURCE, DATASOURCE, LangType.SQL, null)); + "SHOW SCHEMAS IN " + MYS3_DATASOURCE, MYS3_DATASOURCE, LangType.SQL, null)); assertNotNull(first.getSessionId()); // set sessionState to RUNNING @@ -420,7 +420,10 @@ public void submitQueryWithDifferentDataSourceSessionWillCreateNewSession() { CreateAsyncQueryResponse second = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - "SHOW SCHEMAS IN " + DATASOURCE, DATASOURCE, LangType.SQL, first.getSessionId())); + "SHOW SCHEMAS IN " + MYS3_DATASOURCE, + MYS3_DATASOURCE, + LangType.SQL, + first.getSessionId())); assertEquals(first.getSessionId(), second.getSessionId()); @@ -431,7 +434,10 @@ public void submitQueryWithDifferentDataSourceSessionWillCreateNewSession() { CreateAsyncQueryResponse third = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - "SHOW SCHEMAS IN " + DSOTHER, DSOTHER, LangType.SQL, second.getSessionId())); + "SHOW SCHEMAS IN " + MYGLUE_DATASOURCE, + MYGLUE_DATASOURCE, + LangType.SQL, + second.getSessionId())); assertNotEquals(second.getSessionId(), third.getSessionId()); } @@ -448,7 +454,7 @@ public void recreateSessionIfStale() { // 1. create async query. CreateAsyncQueryResponse first = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest("select 1", DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest("select 1", MYS3_DATASOURCE, LangType.SQL, null)); assertNotNull(first.getSessionId()); // set sessionState to RUNNING @@ -458,7 +464,7 @@ public void recreateSessionIfStale() { CreateAsyncQueryResponse second = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - "select 1", DATASOURCE, LangType.SQL, first.getSessionId())); + "select 1", MYS3_DATASOURCE, LangType.SQL, first.getSessionId())); assertEquals(first.getSessionId(), second.getSessionId()); @@ -476,7 +482,7 @@ public void recreateSessionIfStale() { CreateAsyncQueryResponse third = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - "select 1", DATASOURCE, LangType.SQL, second.getSessionId())); + "select 1", MYS3_DATASOURCE, LangType.SQL, second.getSessionId())); assertNotEquals(second.getSessionId(), third.getSessionId()); } finally { // set timeout setting to 0 @@ -501,11 +507,11 @@ public void submitQueryInInvalidSessionWillCreateNewSession() { enableSession(true); // 1. create async query with invalid sessionId - SessionId invalidSessionId = SessionId.newSessionId(DATASOURCE); + SessionId invalidSessionId = SessionId.newSessionId(MYS3_DATASOURCE); CreateAsyncQueryResponse asyncQuery = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - "select 1", DATASOURCE, LangType.SQL, invalidSessionId.getSessionId())); + "select 1", MYS3_DATASOURCE, LangType.SQL, invalidSessionId.getSessionId())); assertNotNull(asyncQuery.getSessionId()); assertNotEquals(invalidSessionId.getSessionId(), asyncQuery.getSessionId()); } @@ -560,7 +566,7 @@ public void concurrentSessionLimitIsDomainLevel() { // 1. create async query. CreateAsyncQueryResponse first = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest("select 1", DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest("select 1", MYS3_DATASOURCE, LangType.SQL, null)); assertNotNull(first.getSessionId()); setSessionState(first.getSessionId(), SessionState.RUNNING); @@ -570,7 +576,8 @@ public void concurrentSessionLimitIsDomainLevel() { ConcurrencyLimitExceededException.class, () -> asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest("select 1", DSOTHER, LangType.SQL, null))); + new CreateAsyncQueryRequest( + "select 1", MYGLUE_DATASOURCE, LangType.SQL, null))); assertEquals("domain concurrent active session can not exceed 1", exception.getMessage()); } @@ -583,14 +590,14 @@ public void testDatasourceDisabled() { // Disable Datasource HashMap datasourceMap = new HashMap<>(); - datasourceMap.put("name", DATASOURCE); + datasourceMap.put("name", MYS3_DATASOURCE); datasourceMap.put("status", DataSourceStatus.DISABLED); this.dataSourceService.patchDataSource(datasourceMap); // 1. create async query. try { asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest("select 1", DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest("select 1", MYS3_DATASOURCE, LangType.SQL, null)); fail("It should have thrown DataSourceDisabledException"); } catch (DatasourceDisabledException exception) { Assertions.assertEquals("Datasource mys3 is disabled.", exception.getMessage()); diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java index c1532d5c10..a6410e7d6b 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java @@ -72,8 +72,8 @@ import org.opensearch.test.OpenSearchIntegTestCase; public class AsyncQueryExecutorServiceSpec extends OpenSearchIntegTestCase { - public static final String DATASOURCE = "mys3"; - public static final String DSOTHER = "mytest"; + public static final String MYS3_DATASOURCE = "mys3"; + public static final String MYGLUE_DATASOURCE = "my_glue"; protected ClusterService clusterService; protected org.opensearch.sql.common.setting.Settings pluginSettings; @@ -115,7 +115,7 @@ public void setup() { dataSourceService = createDataSourceService(); DataSourceMetadata dm = new DataSourceMetadata.Builder() - .setName(DATASOURCE) + .setName(MYS3_DATASOURCE) .setConnector(DataSourceType.S3GLUE) .setProperties( ImmutableMap.of( @@ -131,7 +131,7 @@ public void setup() { dataSourceService.createDataSource(dm); DataSourceMetadata otherDm = new DataSourceMetadata.Builder() - .setName(DSOTHER) + .setName(MYGLUE_DATASOURCE) .setConnector(DataSourceType.S3GLUE) .setProperties( ImmutableMap.of( @@ -305,7 +305,7 @@ public void setConcurrentRefreshJob(long limit) { int search(QueryBuilder query) { SearchRequest searchRequest = new SearchRequest(); - searchRequest.indices(DATASOURCE_TO_REQUEST_INDEX.apply(DATASOURCE)); + searchRequest.indices(DATASOURCE_TO_REQUEST_INDEX.apply(MYS3_DATASOURCE)); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(query); searchRequest.source(searchSourceBuilder); @@ -315,9 +315,9 @@ int search(QueryBuilder query) { } void setSessionState(String sessionId, SessionState sessionState) { - Optional model = getSession(stateStore, DATASOURCE).apply(sessionId); + Optional model = getSession(stateStore, MYS3_DATASOURCE).apply(sessionId); SessionModel updated = - updateSessionState(stateStore, DATASOURCE).apply(model.get(), sessionState); + updateSessionState(stateStore, MYS3_DATASOURCE).apply(model.get(), sessionState); assertEquals(sessionState, updated.getSessionState()); } diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java index 3a9b6e12a9..10598d110c 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java @@ -53,7 +53,7 @@ public class AsyncQueryGetResultSpecTest extends AsyncQueryExecutorServiceSpec { @Before public void doSetUp() { - mockIndexState = new MockFlintSparkJob(stateStore, mockIndex.latestId, DATASOURCE); + mockIndexState = new MockFlintSparkJob(stateStore, mockIndex.latestId, MYS3_DATASOURCE); } @Test @@ -436,7 +436,7 @@ public JSONObject getResultWithQueryId(String queryId, String resultIndex) { }); this.createQueryResponse = queryService.createAsyncQuery( - new CreateAsyncQueryRequest(query, DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest(query, MYS3_DATASOURCE, LangType.SQL, null)); } AssertionHelper withInteraction(Interaction interaction) { @@ -510,8 +510,8 @@ void emrJobWriteResultDoc(Map resultDoc) { /** Simulate EMR-S updates query_execution_request with state */ void emrJobUpdateStatementState(StatementState newState) { - StatementModel stmt = getStatement(stateStore, DATASOURCE).apply(queryId).get(); - StateStore.updateStatementState(stateStore, DATASOURCE).apply(stmt, newState); + StatementModel stmt = getStatement(stateStore, MYS3_DATASOURCE).apply(queryId).get(); + StateStore.updateStatementState(stateStore, MYS3_DATASOURCE).apply(stmt, newState); } void emrJobUpdateJobState(JobRunState jobState) { @@ -525,7 +525,7 @@ private Map createEmptyResultDoc(String queryId) { document.put("schema", ImmutableList.of()); document.put("jobRunId", "XXX"); document.put("applicationId", "YYY"); - document.put("dataSourceName", DATASOURCE); + document.put("dataSourceName", MYS3_DATASOURCE); document.put("status", "SUCCESS"); document.put("error", ""); document.put("queryId", queryId); @@ -550,7 +550,7 @@ private Map createResultDoc( document.put("schema", schema); document.put("jobRunId", "XXX"); document.put("applicationId", "YYY"); - document.put("dataSourceName", DATASOURCE); + document.put("dataSourceName", MYS3_DATASOURCE); document.put("status", "SUCCESS"); document.put("error", ""); document.put("queryId", queryId); diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java index 132074de63..cc9daadb3c 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java @@ -113,7 +113,8 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { // 1.drop index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest( + mockDS.query, MYS3_DATASOURCE, LangType.SQL, null)); assertNotNull(response.getQueryId()); assertTrue(clusterService.state().routingTable().hasIndex(mockDS.indexName)); @@ -161,7 +162,8 @@ public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { // 1.drop index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest( + mockDS.query, MYS3_DATASOURCE, LangType.SQL, null)); // 2.fetch result. AsyncQueryExecutionResponse asyncQueryResults = @@ -199,7 +201,8 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { // 1. drop index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest( + mockDS.query, MYS3_DATASOURCE, LangType.SQL, null)); // 2. fetch result AsyncQueryExecutionResponse asyncQueryResults = @@ -234,13 +237,14 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.createIndex(); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.latestId, DATASOURCE); + new MockFlintSparkJob(stateStore, mockDS.latestId, MYS3_DATASOURCE); flintIndexJob.refreshing(); // 1.drop index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest( + mockDS.query, MYS3_DATASOURCE, LangType.SQL, null)); assertNotNull(response.getQueryId()); assertTrue(clusterService.state().routingTable().hasIndex(mockDS.indexName)); @@ -290,13 +294,14 @@ public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { mockDS.createIndex(); // Mock index state in refresh state. MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.latestId, DATASOURCE); + new MockFlintSparkJob(stateStore, mockDS.latestId, MYS3_DATASOURCE); flintIndexJob.refreshing(); // 1.drop index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest( + mockDS.query, MYS3_DATASOURCE, LangType.SQL, null)); // 2.fetch result. AsyncQueryExecutionResponse asyncQueryResults = @@ -334,13 +339,14 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.createIndex(); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.latestId, DATASOURCE); + new MockFlintSparkJob(stateStore, mockDS.latestId, MYS3_DATASOURCE); flintIndexJob.refreshing(); // 1. drop index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest( + mockDS.query, MYS3_DATASOURCE, LangType.SQL, null)); // 2. fetch result AsyncQueryExecutionResponse asyncQueryResults = @@ -377,13 +383,14 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.createIndex(); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.latestId, DATASOURCE); + new MockFlintSparkJob(stateStore, mockDS.latestId, MYS3_DATASOURCE); flintIndexJob.refreshing(); // 1. drop index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest( + mockDS.query, MYS3_DATASOURCE, LangType.SQL, null)); // 2. fetch result assertEquals( @@ -425,13 +432,14 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.createIndex(); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.latestId, DATASOURCE); + new MockFlintSparkJob(stateStore, mockDS.latestId, MYS3_DATASOURCE); flintIndexJob.active(); // 1. drop index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest( + mockDS.query, MYS3_DATASOURCE, LangType.SQL, null)); // 2. fetch result AsyncQueryExecutionResponse asyncQueryExecutionResponse = @@ -470,13 +478,14 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.createIndex(); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.latestId, DATASOURCE); + new MockFlintSparkJob(stateStore, mockDS.latestId, MYS3_DATASOURCE); flintIndexJob.creating(); // 1. drop index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest( + mockDS.query, MYS3_DATASOURCE, LangType.SQL, null)); // 2. fetch result assertEquals( @@ -515,12 +524,13 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.createIndex(); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.latestId, DATASOURCE); + new MockFlintSparkJob(stateStore, mockDS.latestId, MYS3_DATASOURCE); // 1. drop index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest( + mockDS.query, MYS3_DATASOURCE, LangType.SQL, null)); // 2. fetch result assertEquals( @@ -565,13 +575,14 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.createIndex(); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.latestId, DATASOURCE); + new MockFlintSparkJob(stateStore, mockDS.latestId, MYS3_DATASOURCE); flintIndexJob.deleting(); // 1. drop index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest( + mockDS.query, MYS3_DATASOURCE, LangType.SQL, null)); AsyncQueryExecutionResponse asyncQueryExecutionResponse = asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); @@ -618,7 +629,8 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { // 1. drop index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest( + mockDS.query, MYS3_DATASOURCE, LangType.SQL, null)); // 2. fetch result AsyncQueryExecutionResponse asyncQueryResults = @@ -638,7 +650,7 @@ public void concurrentRefreshJobLimitNotApplied() { COVERING.createIndex(); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, COVERING.latestId, DATASOURCE); + new MockFlintSparkJob(stateStore, COVERING.latestId, MYS3_DATASOURCE); flintIndexJob.refreshing(); // query with auto refresh @@ -647,7 +659,7 @@ public void concurrentRefreshJobLimitNotApplied() { + "l_quantity) WITH (auto_refresh = true)"; CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(query, DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest(query, MYS3_DATASOURCE, LangType.SQL, null)); assertNull(response.getSessionId()); } @@ -663,7 +675,7 @@ public void concurrentRefreshJobLimitAppliedToDDLWithAuthRefresh() { COVERING.createIndex(); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, COVERING.latestId, DATASOURCE); + new MockFlintSparkJob(stateStore, COVERING.latestId, MYS3_DATASOURCE); flintIndexJob.refreshing(); // query with auto_refresh = true. @@ -675,7 +687,7 @@ public void concurrentRefreshJobLimitAppliedToDDLWithAuthRefresh() { ConcurrencyLimitExceededException.class, () -> asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(query, DATASOURCE, LangType.SQL, null))); + new CreateAsyncQueryRequest(query, MYS3_DATASOURCE, LangType.SQL, null))); assertEquals("domain concurrent refresh job can not exceed 1", exception.getMessage()); } @@ -691,7 +703,7 @@ public void concurrentRefreshJobLimitAppliedToRefresh() { COVERING.createIndex(); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, COVERING.latestId, DATASOURCE); + new MockFlintSparkJob(stateStore, COVERING.latestId, MYS3_DATASOURCE); flintIndexJob.refreshing(); // query with auto_refresh = true. @@ -701,7 +713,7 @@ public void concurrentRefreshJobLimitAppliedToRefresh() { ConcurrencyLimitExceededException.class, () -> asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(query, DATASOURCE, LangType.SQL, null))); + new CreateAsyncQueryRequest(query, MYS3_DATASOURCE, LangType.SQL, null))); assertEquals("domain concurrent refresh job can not exceed 1", exception.getMessage()); } @@ -718,12 +730,12 @@ public void concurrentRefreshJobLimitNotAppliedToDDL() { COVERING.createIndex(); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, COVERING.latestId, DATASOURCE); + new MockFlintSparkJob(stateStore, COVERING.latestId, MYS3_DATASOURCE); flintIndexJob.refreshing(); CreateAsyncQueryResponse asyncQueryResponse = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(query, DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest(query, MYS3_DATASOURCE, LangType.SQL, null)); assertNotNull(asyncQueryResponse.getSessionId()); } @@ -754,7 +766,7 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { // 1. submit create / refresh index query CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(query, DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest(query, MYS3_DATASOURCE, LangType.SQL, null)); // 2. cancel query IllegalArgumentException exception = @@ -790,13 +802,13 @@ public GetJobRunResult getJobRunResult( mockDS.createIndex(); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.latestId, DATASOURCE); + new MockFlintSparkJob(stateStore, mockDS.latestId, MYS3_DATASOURCE); // 1. Submit REFRESH statement CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - mockDS.refreshQuery, DATASOURCE, LangType.SQL, null)); + mockDS.refreshQuery, MYS3_DATASOURCE, LangType.SQL, null)); // mock index state. flintIndexJob.refreshing(); @@ -833,13 +845,13 @@ public GetJobRunResult getJobRunResult( mockDS.createIndex(); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.latestId, DATASOURCE); + new MockFlintSparkJob(stateStore, mockDS.latestId, MYS3_DATASOURCE); // 1. Submit REFRESH statement CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - mockDS.refreshQuery, DATASOURCE, LangType.SQL, null)); + mockDS.refreshQuery, MYS3_DATASOURCE, LangType.SQL, null)); // mock index state. flintIndexJob.active(); @@ -875,14 +887,14 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockFlintIndex.createIndex(); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, indexName + "_latest_id", DATASOURCE); + new MockFlintSparkJob(stateStore, indexName + "_latest_id", MYS3_DATASOURCE); // 1. Submit REFRESH statement CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( "REFRESH INDEX covering_corrupted ON my_glue.mydb.http_logs", - DATASOURCE, + MYS3_DATASOURCE, LangType.SQL, null)); // mock index state. @@ -940,14 +952,14 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.updateIndexOptions(existingOptions, false); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), DATASOURCE); + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); flintIndexJob.active(); // 1. alter index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - mockDS.getQuery(), DATASOURCE, LangType.SQL, null)); + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); // 2. fetch result AsyncQueryExecutionResponse asyncQueryExecutionResponse = @@ -1007,14 +1019,14 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.updateIndexOptions(existingOptions, true); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), DATASOURCE); + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); flintIndexJob.active(); // 1. alter index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - mockDS.getQuery(), DATASOURCE, LangType.SQL, null)); + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); // 2. fetch result AsyncQueryExecutionResponse asyncQueryExecutionResponse = @@ -1086,14 +1098,14 @@ public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { mockDS.updateIndexOptions(existingOptions, false); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), DATASOURCE); + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); flintIndexJob.active(); // 1. alter index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - mockDS.getQuery(), DATASOURCE, LangType.SQL, null)); + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); // 2. fetch result AsyncQueryExecutionResponse asyncQueryExecutionResponse = @@ -1148,14 +1160,14 @@ public void testAlterIndexQueryConvertingToAutoRefresh() { mockDS.updateIndexOptions(existingOptions, false); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), DATASOURCE); + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); flintIndexJob.active(); // 1. alter index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - mockDS.getQuery(), DATASOURCE, LangType.SQL, null)); + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); // 2. fetch result assertEquals( @@ -1212,14 +1224,14 @@ public void testAlterIndexQueryWithOutAnyAutoRefresh() { mockDS.updateIndexOptions(existingOptions, false); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), DATASOURCE); + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); flintIndexJob.active(); // 1. alter index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - mockDS.getQuery(), DATASOURCE, LangType.SQL, null)); + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); // 2. fetch result assertEquals( @@ -1285,14 +1297,14 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.updateIndexOptions(existingOptions, false); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), DATASOURCE); + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); flintIndexJob.active(); // 1. alter index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - mockDS.getQuery(), DATASOURCE, LangType.SQL, null)); + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); // 2. fetch result AsyncQueryExecutionResponse asyncQueryExecutionResponse = @@ -1358,14 +1370,14 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.updateIndexOptions(existingOptions, false); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), DATASOURCE); + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); flintIndexJob.active(); // 1. alter index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - mockDS.getQuery(), DATASOURCE, LangType.SQL, null)); + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); // 2. fetch result AsyncQueryExecutionResponse asyncQueryExecutionResponse = @@ -1425,14 +1437,14 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.updateIndexOptions(existingOptions, true); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), DATASOURCE); + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); flintIndexJob.active(); // 1. alter index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - mockDS.getQuery(), DATASOURCE, LangType.SQL, null)); + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); // 2. fetch result AsyncQueryExecutionResponse asyncQueryExecutionResponse = @@ -1485,14 +1497,14 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.updateIndexOptions(existingOptions, true); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), DATASOURCE); + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); flintIndexJob.active(); // 1. alter index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - mockDS.getQuery(), DATASOURCE, LangType.SQL, null)); + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); // 2. fetch result AsyncQueryExecutionResponse asyncQueryExecutionResponse = @@ -1547,14 +1559,14 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.updateIndexOptions(existingOptions, true); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), DATASOURCE); + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); flintIndexJob.active(); // 1. alter index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - mockDS.getQuery(), DATASOURCE, LangType.SQL, null)); + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); // 2. fetch result AsyncQueryExecutionResponse asyncQueryExecutionResponse = @@ -1609,14 +1621,14 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.updateIndexOptions(existingOptions, true); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), DATASOURCE); + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); flintIndexJob.refreshing(); // 1. alter index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - mockDS.getQuery(), DATASOURCE, LangType.SQL, null)); + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); // 2. fetch result AsyncQueryExecutionResponse asyncQueryExecutionResponse = @@ -1668,14 +1680,14 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.updateIndexOptions(existingOptions, true); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), DATASOURCE); + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); flintIndexJob.refreshing(); // 1. alter index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - mockDS.getQuery(), DATASOURCE, LangType.SQL, null)); + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); // 2. fetch result AsyncQueryExecutionResponse asyncQueryExecutionResponse = @@ -1725,14 +1737,14 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.updateIndexOptions(existingOptions, false); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), DATASOURCE); + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); flintIndexJob.updating(); // 1. alter index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - mockDS.getQuery(), DATASOURCE, LangType.SQL, null)); + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); // 2. fetch result AsyncQueryExecutionResponse asyncQueryExecutionResponse = diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecVacuumTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecVacuumTest.java index 67c89c791c..ba8a14d851 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecVacuumTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecVacuumTest.java @@ -201,7 +201,7 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { // Vacuum index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest(mockDS.query, MYS3_DATASOURCE, LangType.SQL, null)); return asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); } diff --git a/spark/src/test/java/org/opensearch/sql/spark/cluster/FlintStreamingJobHouseKeeperTaskTest.java b/spark/src/test/java/org/opensearch/sql/spark/cluster/FlintStreamingJobHouseKeeperTaskTest.java new file mode 100644 index 0000000000..58f7c9b4c0 --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/cluster/FlintStreamingJobHouseKeeperTaskTest.java @@ -0,0 +1,517 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.cluster; + +import static org.opensearch.sql.datasource.model.DataSourceStatus.DISABLED; + +import com.amazonaws.services.emrserverless.model.GetJobRunResult; +import com.amazonaws.services.emrserverless.model.JobRun; +import com.google.common.collect.ImmutableList; +import java.util.HashMap; +import java.util.Map; +import lombok.SneakyThrows; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.opensearch.sql.datasource.model.DataSourceStatus; +import org.opensearch.sql.legacy.metrics.MetricName; +import org.opensearch.sql.legacy.metrics.Metrics; +import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorServiceSpec; +import org.opensearch.sql.spark.asyncquery.model.MockFlintIndex; +import org.opensearch.sql.spark.asyncquery.model.MockFlintSparkJob; +import org.opensearch.sql.spark.client.EMRServerlessClientFactory; +import org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions; +import org.opensearch.sql.spark.flint.FlintIndexMetadata; +import org.opensearch.sql.spark.flint.FlintIndexMetadataService; +import org.opensearch.sql.spark.flint.FlintIndexMetadataServiceImpl; +import org.opensearch.sql.spark.flint.FlintIndexState; +import org.opensearch.sql.spark.flint.FlintIndexType; + +public class FlintStreamingJobHouseKeeperTaskTest extends AsyncQueryExecutorServiceSpec { + + @Test + @SneakyThrows + public void testStreamingJobHouseKeeperWhenDataSourceDisabled() { + MockFlintIndex SKIPPING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_skipping_index", + FlintIndexType.SKIPPING, + "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false," + + " incremental_refresh=true, output_mode=\"complete\")"); + MockFlintIndex COVERING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_covering_index", + FlintIndexType.COVERING, + "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (auto_refresh=false," + + " incremental_refresh=true, output_mode=\"complete\")"); + MockFlintIndex MV = + new MockFlintIndex( + client, + "flint_my_glue_mydb_mv", + FlintIndexType.MATERIALIZED_VIEW, + "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false," + + " incremental_refresh=true, output_mode=\"complete\") "); + Map indexJobMapping = new HashMap<>(); + ImmutableList.of(SKIPPING, COVERING, MV) + .forEach( + INDEX -> { + INDEX.createIndex(); + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, INDEX.getLatestId(), MYGLUE_DATASOURCE); + indexJobMapping.put(INDEX, flintIndexJob); + HashMap existingOptions = new HashMap<>(); + existingOptions.put("auto_refresh", "true"); + // Making Index Auto Refresh + INDEX.updateIndexOptions(existingOptions, false); + flintIndexJob.refreshing(); + }); + changeDataSourceStatus(MYGLUE_DATASOURCE, DISABLED); + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + super.getJobRunResult(applicationId, jobId); + JobRun jobRun = new JobRun(); + jobRun.setState("cancelled"); + return new GetJobRunResult().withJobRun(jobRun); + } + }; + EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + FlintIndexMetadataService flintIndexMetadataService = new FlintIndexMetadataServiceImpl(client); + FlintStreamingJobHouseKeeperTask flintStreamingJobHouseKeeperTask = + new FlintStreamingJobHouseKeeperTask( + dataSourceService, flintIndexMetadataService, stateStore, emrServerlessClientFactory); + Thread thread = new Thread(flintStreamingJobHouseKeeperTask); + thread.start(); + thread.join(); + ImmutableList.of(SKIPPING, COVERING, MV) + .forEach( + INDEX -> { + MockFlintSparkJob flintIndexJob = indexJobMapping.get(INDEX); + flintIndexJob.assertState(FlintIndexState.ACTIVE); + Map mappings = INDEX.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("false", options.get("auto_refresh")); + }); + emrsClient.cancelJobRunCalled(3); + emrsClient.getJobRunResultCalled(3); + emrsClient.startJobRunCalled(0); + Assertions.assertEquals( + 0L, + Metrics.getInstance() + .getNumericalMetric(MetricName.STREAMING_JOB_HOUSEKEEPER_TASK_FAILURE_COUNT) + .getValue()); + } + + @Test + @SneakyThrows + public void testStreamingJobHouseKeeperWhenCancelJobGivesTimeout() { + MockFlintIndex SKIPPING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_skipping_index", + FlintIndexType.SKIPPING, + "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false," + + " incremental_refresh=true, output_mode=\"complete\")"); + MockFlintIndex COVERING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_covering_index", + FlintIndexType.COVERING, + "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (auto_refresh=false," + + " incremental_refresh=true, output_mode=\"complete\")"); + MockFlintIndex MV = + new MockFlintIndex( + client, + "flint_my_glue_mydb_mv", + FlintIndexType.MATERIALIZED_VIEW, + "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false," + + " incremental_refresh=true, output_mode=\"complete\") "); + Map indexJobMapping = new HashMap<>(); + ImmutableList.of(SKIPPING, COVERING, MV) + .forEach( + INDEX -> { + INDEX.createIndex(); + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, INDEX.getLatestId(), MYGLUE_DATASOURCE); + indexJobMapping.put(INDEX, flintIndexJob); + HashMap existingOptions = new HashMap<>(); + existingOptions.put("auto_refresh", "true"); + // Making Index Auto Refresh + INDEX.updateIndexOptions(existingOptions, false); + flintIndexJob.refreshing(); + }); + changeDataSourceStatus(MYGLUE_DATASOURCE, DISABLED); + LocalEMRSClient emrsClient = new LocalEMRSClient(); + EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + FlintIndexMetadataService flintIndexMetadataService = new FlintIndexMetadataServiceImpl(client); + FlintStreamingJobHouseKeeperTask flintStreamingJobHouseKeeperTask = + new FlintStreamingJobHouseKeeperTask( + dataSourceService, flintIndexMetadataService, stateStore, emrServerlessClientFactory); + Thread thread = new Thread(flintStreamingJobHouseKeeperTask); + thread.start(); + thread.join(); + ImmutableList.of(SKIPPING, COVERING, MV) + .forEach( + INDEX -> { + MockFlintSparkJob flintIndexJob = indexJobMapping.get(INDEX); + flintIndexJob.assertState(FlintIndexState.REFRESHING); + Map mappings = INDEX.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("false", options.get("auto_refresh")); + }); + emrsClient.cancelJobRunCalled(3); + emrsClient.getJobRunResultCalled(9); + emrsClient.startJobRunCalled(0); + Assertions.assertEquals( + 3L, + Metrics.getInstance() + .getNumericalMetric(MetricName.STREAMING_JOB_HOUSEKEEPER_TASK_FAILURE_COUNT) + .getValue()); + } + + @Test + @SneakyThrows + public void testSimulateConcurrentJobHouseKeeperExecution() { + MockFlintIndex SKIPPING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_skipping_index", + FlintIndexType.SKIPPING, + "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false," + + " incremental_refresh=true, output_mode=\"complete\")"); + MockFlintIndex COVERING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_covering_index", + FlintIndexType.COVERING, + "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (auto_refresh=false," + + " incremental_refresh=true, output_mode=\"complete\")"); + MockFlintIndex MV = + new MockFlintIndex( + client, + "flint_my_glue_mydb_mv", + FlintIndexType.MATERIALIZED_VIEW, + "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false," + + " incremental_refresh=true, output_mode=\"complete\") "); + Map indexJobMapping = new HashMap<>(); + ImmutableList.of(SKIPPING, COVERING, MV) + .forEach( + INDEX -> { + INDEX.createIndex(); + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, INDEX.getLatestId(), MYGLUE_DATASOURCE); + indexJobMapping.put(INDEX, flintIndexJob); + HashMap existingOptions = new HashMap<>(); + existingOptions.put("auto_refresh", "true"); + // Making Index Auto Refresh + INDEX.updateIndexOptions(existingOptions, false); + flintIndexJob.refreshing(); + }); + changeDataSourceStatus(MYGLUE_DATASOURCE, DISABLED); + LocalEMRSClient emrsClient = new LocalEMRSClient(); + EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + FlintIndexMetadataService flintIndexMetadataService = new FlintIndexMetadataServiceImpl(client); + FlintStreamingJobHouseKeeperTask flintStreamingJobHouseKeeperTask = + new FlintStreamingJobHouseKeeperTask( + dataSourceService, flintIndexMetadataService, stateStore, emrServerlessClientFactory); + FlintStreamingJobHouseKeeperTask.isRunning.compareAndSet(false, true); + Thread thread = new Thread(flintStreamingJobHouseKeeperTask); + thread.start(); + thread.join(); + ImmutableList.of(SKIPPING, COVERING, MV) + .forEach( + INDEX -> { + MockFlintSparkJob flintIndexJob = indexJobMapping.get(INDEX); + flintIndexJob.assertState(FlintIndexState.REFRESHING); + Map mappings = INDEX.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("true", options.get("auto_refresh")); + }); + emrsClient.cancelJobRunCalled(0); + emrsClient.getJobRunResultCalled(0); + emrsClient.startJobRunCalled(0); + Assertions.assertEquals( + 0L, + Metrics.getInstance() + .getNumericalMetric(MetricName.STREAMING_JOB_HOUSEKEEPER_TASK_FAILURE_COUNT) + .getValue()); + FlintStreamingJobHouseKeeperTask.isRunning.compareAndSet(true, false); + } + + @SneakyThrows + @Test + public void testStreamingJobClearnerWhenDataSourceIsDeleted() { + MockFlintIndex SKIPPING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_skipping_index", + FlintIndexType.SKIPPING, + "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false," + + " incremental_refresh=true, output_mode=\"complete\")"); + MockFlintIndex COVERING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_covering_index", + FlintIndexType.COVERING, + "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (auto_refresh=false," + + " incremental_refresh=true, output_mode=\"complete\")"); + MockFlintIndex MV = + new MockFlintIndex( + client, + "flint_my_glue_mydb_mv", + FlintIndexType.MATERIALIZED_VIEW, + "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false," + + " incremental_refresh=true, output_mode=\"complete\") "); + Map indexJobMapping = new HashMap<>(); + ImmutableList.of(SKIPPING, COVERING, MV) + .forEach( + INDEX -> { + INDEX.createIndex(); + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, INDEX.getLatestId(), MYGLUE_DATASOURCE); + indexJobMapping.put(INDEX, flintIndexJob); + HashMap existingOptions = new HashMap<>(); + existingOptions.put("auto_refresh", "true"); + // Making Index Auto Refresh + INDEX.updateIndexOptions(existingOptions, false); + flintIndexJob.refreshing(); + }); + this.dataSourceService.deleteDataSource(MYGLUE_DATASOURCE); + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + super.getJobRunResult(applicationId, jobId); + JobRun jobRun = new JobRun(); + jobRun.setState("cancelled"); + return new GetJobRunResult().withJobRun(jobRun); + } + }; + EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + FlintIndexMetadataService flintIndexMetadataService = new FlintIndexMetadataServiceImpl(client); + FlintStreamingJobHouseKeeperTask flintStreamingJobHouseKeeperTask = + new FlintStreamingJobHouseKeeperTask( + dataSourceService, flintIndexMetadataService, stateStore, emrServerlessClientFactory); + Thread thread = new Thread(flintStreamingJobHouseKeeperTask); + thread.start(); + thread.join(); + ImmutableList.of(SKIPPING, COVERING, MV) + .forEach( + INDEX -> { + MockFlintSparkJob flintIndexJob = indexJobMapping.get(INDEX); + flintIndexJob.assertState(FlintIndexState.DELETED); + Map mappings = INDEX.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("true", options.get("auto_refresh")); + }); + emrsClient.cancelJobRunCalled(3); + emrsClient.getJobRunResultCalled(3); + emrsClient.startJobRunCalled(0); + Assertions.assertEquals( + 0L, + Metrics.getInstance() + .getNumericalMetric(MetricName.STREAMING_JOB_HOUSEKEEPER_TASK_FAILURE_COUNT) + .getValue()); + } + + @Test + @SneakyThrows + public void testStreamingJobHouseKeeperWhenDataSourceIsNeitherDisabledNorDeleted() { + MockFlintIndex SKIPPING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_skipping_index", + FlintIndexType.SKIPPING, + "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false," + + " incremental_refresh=true, output_mode=\"complete\")"); + MockFlintIndex COVERING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_covering_index", + FlintIndexType.COVERING, + "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (auto_refresh=false," + + " incremental_refresh=true, output_mode=\"complete\")"); + MockFlintIndex MV = + new MockFlintIndex( + client, + "flint_my_glue_mydb_mv", + FlintIndexType.MATERIALIZED_VIEW, + "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false," + + " incremental_refresh=true, output_mode=\"complete\") "); + Map indexJobMapping = new HashMap<>(); + ImmutableList.of(SKIPPING, COVERING, MV) + .forEach( + INDEX -> { + INDEX.createIndex(); + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, INDEX.getLatestId(), MYGLUE_DATASOURCE); + indexJobMapping.put(INDEX, flintIndexJob); + HashMap existingOptions = new HashMap<>(); + existingOptions.put("auto_refresh", "true"); + // Making Index Auto Refresh + INDEX.updateIndexOptions(existingOptions, false); + flintIndexJob.refreshing(); + }); + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + super.getJobRunResult(applicationId, jobId); + JobRun jobRun = new JobRun(); + jobRun.setState("cancelled"); + return new GetJobRunResult().withJobRun(jobRun); + } + }; + EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + FlintIndexMetadataService flintIndexMetadataService = new FlintIndexMetadataServiceImpl(client); + FlintStreamingJobHouseKeeperTask flintStreamingJobHouseKeeperTask = + new FlintStreamingJobHouseKeeperTask( + dataSourceService, flintIndexMetadataService, stateStore, emrServerlessClientFactory); + Thread thread = new Thread(flintStreamingJobHouseKeeperTask); + thread.start(); + thread.join(); + ImmutableList.of(SKIPPING, COVERING, MV) + .forEach( + INDEX -> { + MockFlintSparkJob flintIndexJob = indexJobMapping.get(INDEX); + flintIndexJob.assertState(FlintIndexState.REFRESHING); + Map mappings = INDEX.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("true", options.get("auto_refresh")); + }); + emrsClient.cancelJobRunCalled(0); + emrsClient.getJobRunResultCalled(0); + emrsClient.startJobRunCalled(0); + Assertions.assertEquals( + 0L, + Metrics.getInstance() + .getNumericalMetric(MetricName.STREAMING_JOB_HOUSEKEEPER_TASK_FAILURE_COUNT) + .getValue()); + } + + @Test + public void testStreamingJobHouseKeeperWhenS3GlueIsDisabledButNotStreamingJobQueries() + throws InterruptedException { + changeDataSourceStatus(MYGLUE_DATASOURCE, DISABLED); + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + super.getJobRunResult(applicationId, jobId); + JobRun jobRun = new JobRun(); + jobRun.setState("cancelled"); + return new GetJobRunResult().withJobRun(jobRun); + } + }; + EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + FlintIndexMetadataService flintIndexMetadataService = new FlintIndexMetadataServiceImpl(client); + FlintStreamingJobHouseKeeperTask flintStreamingJobHouseKeeperTask = + new FlintStreamingJobHouseKeeperTask( + dataSourceService, flintIndexMetadataService, stateStore, emrServerlessClientFactory); + Thread thread = new Thread(flintStreamingJobHouseKeeperTask); + thread.start(); + thread.join(); + emrsClient.getJobRunResultCalled(0); + emrsClient.startJobRunCalled(0); + emrsClient.cancelJobRunCalled(0); + Assertions.assertEquals( + 0L, + Metrics.getInstance() + .getNumericalMetric(MetricName.STREAMING_JOB_HOUSEKEEPER_TASK_FAILURE_COUNT) + .getValue()); + } + + @Test + public void testStreamingJobHouseKeeperWhenFlintIndexIsCorrupted() throws InterruptedException { + String indexName = "flint_my_glue_mydb_http_logs_covering_error_index"; + MockFlintIndex mockFlintIndex = + new MockFlintIndex(client(), indexName, FlintIndexType.COVERING, null); + mockFlintIndex.createIndex(); + changeDataSourceStatus(MYGLUE_DATASOURCE, DISABLED); + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + super.getJobRunResult(applicationId, jobId); + JobRun jobRun = new JobRun(); + jobRun.setState("cancelled"); + return new GetJobRunResult().withJobRun(jobRun); + } + }; + EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + FlintIndexMetadataService flintIndexMetadataService = new FlintIndexMetadataServiceImpl(client); + FlintStreamingJobHouseKeeperTask flintStreamingJobHouseKeeperTask = + new FlintStreamingJobHouseKeeperTask( + dataSourceService, flintIndexMetadataService, stateStore, emrServerlessClientFactory); + Thread thread = new Thread(flintStreamingJobHouseKeeperTask); + thread.start(); + thread.join(); + emrsClient.getJobRunResultCalled(0); + emrsClient.startJobRunCalled(0); + emrsClient.cancelJobRunCalled(0); + Assertions.assertEquals( + 1L, + Metrics.getInstance() + .getNumericalMetric(MetricName.STREAMING_JOB_HOUSEKEEPER_TASK_FAILURE_COUNT) + .getValue()); + } + + @SneakyThrows + @Test + public void testErrorScenario() { + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + super.getJobRunResult(applicationId, jobId); + JobRun jobRun = new JobRun(); + jobRun.setState("cancelled"); + return new GetJobRunResult().withJobRun(jobRun); + } + }; + EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + FlintIndexMetadataService flintIndexMetadataService = + new FlintIndexMetadataService() { + @Override + public Map getFlintIndexMetadata(String indexPattern) { + throw new RuntimeException("Couldn't fetch details from ElasticSearch"); + } + + @Override + public void updateIndexToManualRefresh( + String indexName, FlintIndexOptions flintIndexOptions) {} + }; + FlintStreamingJobHouseKeeperTask flintStreamingJobHouseKeeperTask = + new FlintStreamingJobHouseKeeperTask( + dataSourceService, flintIndexMetadataService, stateStore, emrServerlessClientFactory); + Thread thread = new Thread(flintStreamingJobHouseKeeperTask); + thread.start(); + thread.join(); + Assertions.assertFalse(FlintStreamingJobHouseKeeperTask.isRunning.get()); + emrsClient.getJobRunResultCalled(0); + emrsClient.startJobRunCalled(0); + emrsClient.cancelJobRunCalled(0); + Assertions.assertEquals( + 1L, + Metrics.getInstance() + .getNumericalMetric(MetricName.STREAMING_JOB_HOUSEKEEPER_TASK_FAILURE_COUNT) + .getValue()); + } + + private void changeDataSourceStatus(String dataSourceName, DataSourceStatus dataSourceStatus) { + HashMap datasourceMap = new HashMap<>(); + datasourceMap.put("name", dataSourceName); + datasourceMap.put("status", dataSourceStatus); + this.dataSourceService.patchDataSource(datasourceMap); + } +} diff --git a/spark/src/test/resources/flint-index-mappings/flint_my_glue_mydb_http_logs_covering_error_index_mapping.json b/spark/src/test/resources/flint-index-mappings/flint_my_glue_mydb_http_logs_covering_error_index_mapping.json new file mode 100644 index 0000000000..edd71b41db --- /dev/null +++ b/spark/src/test/resources/flint-index-mappings/flint_my_glue_mydb_http_logs_covering_error_index_mapping.json @@ -0,0 +1,39 @@ +{ + "_meta": { + "latestId": "flint_my_glue_mydb_http_logs_covering_error_index_latest_id", + "kind": "random", + "indexedColumns": [ + { + "columnType": "string", + "columnName": "clientip" + }, + { + "columnType": "int", + "columnName": "status" + } + ], + "name": "covering", + "options": { + "auto_refresh": "true", + "incremental_refresh": "false", + "index_settings": "{\"number_of_shards\":5,\"number_of_replicas\":1}", + "checkpoint_location": "s3://vamsicheckpoint/cv/" + }, + "source": "my_glue.mydb.http_logs", + "version": "0.2.0", + "properties": { + "env": { + "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID": "00fhh7frokkf0k0l", + "SERVERLESS_EMR_JOB_ID": "00fhoag6i0671o0m" + } + } + }, + "properties": { + "clientip": { + "type": "keyword" + }, + "status": { + "type": "integer" + } + } +} \ No newline at end of file