From 3e4c0e96453b54eabef3306946f33a71af85e47a Mon Sep 17 00:00:00 2001 From: Vamsi Manohar Date: Sun, 17 Mar 2024 20:17:29 -0700 Subject: [PATCH] FlintStreamingJobCleanerTask Implementation Signed-off-by: Vamsi Manohar --- .../sql/legacy/metrics/MetricName.java | 4 +- .../org/opensearch/sql/plugin/SQLPlugin.java | 9 +- .../cluster/ClusterManagerEventListener.java | 35 +++++- .../cluster/FlintStreamingJobCleanerTask.java | 108 ++++++++++++++++++ 4 files changed, 152 insertions(+), 4 deletions(-) create mode 100644 spark/src/main/java/org/opensearch/sql/spark/cluster/FlintStreamingJobCleanerTask.java diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/metrics/MetricName.java b/legacy/src/main/java/org/opensearch/sql/legacy/metrics/MetricName.java index 91ade7b038..4c3f24a9a9 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/metrics/MetricName.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/metrics/MetricName.java @@ -47,7 +47,8 @@ public enum MetricName { EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT("emr_cancel_job_request_failure_count"), EMR_STREAMING_QUERY_JOBS_CREATION_COUNT("emr_streaming_jobs_creation_count"), EMR_INTERACTIVE_QUERY_JOBS_CREATION_COUNT("emr_interactive_jobs_creation_count"), - EMR_BATCH_QUERY_JOBS_CREATION_COUNT("emr_batch_jobs_creation_count"); + EMR_BATCH_QUERY_JOBS_CREATION_COUNT("emr_batch_jobs_creation_count"), + STREAMING_JOB_CLEANER_TASK_FAILURE_COUNT("streaming_job_cleaner_task_failure_count"); private String name; @@ -91,6 +92,7 @@ public static List getNames() { .add(ASYNC_QUERY_CREATE_API_REQUEST_COUNT) .add(ASYNC_QUERY_GET_API_REQUEST_COUNT) .add(ASYNC_QUERY_CANCEL_API_REQUEST_COUNT) + .add(STREAMING_JOB_CLEANER_TASK_FAILURE_COUNT) .build(); public boolean isNumerical() { diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index 2b75a8b2c9..8487f30716 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -79,7 +79,10 @@ import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse; import org.opensearch.sql.prometheus.storage.PrometheusStorageFactory; import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService; +import org.opensearch.sql.spark.client.EMRServerlessClientFactory; import org.opensearch.sql.spark.cluster.ClusterManagerEventListener; +import org.opensearch.sql.spark.execution.statestore.StateStore; +import org.opensearch.sql.spark.flint.FlintIndexMetadataService; import org.opensearch.sql.spark.rest.RestAsyncQueryManagementAction; import org.opensearch.sql.spark.storage.SparkStorageFactory; import org.opensearch.sql.spark.transport.TransportCancelAsyncQueryRequestAction; @@ -221,7 +224,11 @@ public Collection createComponents( OpenSearchSettings.SESSION_INDEX_TTL_SETTING, OpenSearchSettings.RESULT_INDEX_TTL_SETTING, OpenSearchSettings.AUTO_INDEX_MANAGEMENT_ENABLED_SETTING, - environment.settings()); + environment.settings(), + dataSourceService, + injector.getInstance(FlintIndexMetadataService.class), + injector.getInstance(StateStore.class), + injector.getInstance(EMRServerlessClientFactory.class)); return ImmutableList.of( dataSourceService, injector.getInstance(AsyncQueryExecutorService.class), diff --git a/spark/src/main/java/org/opensearch/sql/spark/cluster/ClusterManagerEventListener.java b/spark/src/main/java/org/opensearch/sql/spark/cluster/ClusterManagerEventListener.java index 3d004b548f..9bea1f4b0d 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/cluster/ClusterManagerEventListener.java +++ b/spark/src/main/java/org/opensearch/sql/spark/cluster/ClusterManagerEventListener.java @@ -19,17 +19,26 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.sql.datasource.DataSourceService; import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.spark.client.EMRServerlessClientFactory; +import org.opensearch.sql.spark.execution.statestore.StateStore; +import org.opensearch.sql.spark.flint.FlintIndexMetadataService; import org.opensearch.threadpool.Scheduler.Cancellable; import org.opensearch.threadpool.ThreadPool; public class ClusterManagerEventListener implements LocalNodeClusterManagerListener { private Cancellable flintIndexRetentionCron; + private Cancellable flintStreamingJobCleanerCron; private ClusterService clusterService; private ThreadPool threadPool; private Client client; private Clock clock; + private DataSourceService dataSourceService; + private FlintIndexMetadataService flintIndexMetadataService; + private StateStore stateStore; + private EMRServerlessClientFactory emrServerlessClientFactory; private Duration sessionTtlDuration; private Duration resultTtlDuration; private boolean isAutoIndexManagementEnabled; @@ -42,13 +51,20 @@ public ClusterManagerEventListener( Setting sessionTtl, Setting resultTtl, Setting isAutoIndexManagementEnabledSetting, - Settings settings) { + Settings settings, + DataSourceService dataSourceService, + FlintIndexMetadataService flintIndexMetadataService, + StateStore stateStore, + EMRServerlessClientFactory emrServerlessClientFactory) { this.clusterService = clusterService; this.threadPool = threadPool; this.client = client; this.clusterService.addLocalNodeClusterManagerListener(this); this.clock = clock; - + this.dataSourceService = dataSourceService; + this.flintIndexMetadataService = flintIndexMetadataService; + this.stateStore = stateStore; + this.emrServerlessClientFactory = emrServerlessClientFactory; this.sessionTtlDuration = toDuration(sessionTtl.get(settings)); this.resultTtlDuration = toDuration(resultTtl.get(settings)); @@ -104,6 +120,19 @@ public void beforeStop() { } }); } + initializeStreamingJobCleanerCron(); + } + + private void initializeStreamingJobCleanerCron() { + flintStreamingJobCleanerCron = + threadPool.scheduleWithFixedDelay( + new FlintStreamingJobCleanerTask( + dataSourceService, + flintIndexMetadataService, + stateStore, + emrServerlessClientFactory), + TimeValue.timeValueMinutes(15), + executorName()); } private void reInitializeFlintIndexRetention() { @@ -125,6 +154,8 @@ private void reInitializeFlintIndexRetention() { public void offClusterManager() { cancel(flintIndexRetentionCron); flintIndexRetentionCron = null; + cancel(flintStreamingJobCleanerCron); + flintStreamingJobCleanerCron = null; } private void cancel(Cancellable cron) { diff --git a/spark/src/main/java/org/opensearch/sql/spark/cluster/FlintStreamingJobCleanerTask.java b/spark/src/main/java/org/opensearch/sql/spark/cluster/FlintStreamingJobCleanerTask.java new file mode 100644 index 0000000000..53259c1f6c --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/cluster/FlintStreamingJobCleanerTask.java @@ -0,0 +1,108 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.cluster; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import lombok.RequiredArgsConstructor; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.sql.datasource.DataSourceService; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.DataSourceStatus; +import org.opensearch.sql.datasource.model.DataSourceType; +import org.opensearch.sql.legacy.metrics.MetricName; +import org.opensearch.sql.legacy.metrics.Metrics; +import org.opensearch.sql.spark.client.EMRServerlessClientFactory; +import org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions; +import org.opensearch.sql.spark.execution.statestore.StateStore; +import org.opensearch.sql.spark.flint.FlintIndexMetadata; +import org.opensearch.sql.spark.flint.FlintIndexMetadataService; +import org.opensearch.sql.spark.flint.operation.FlintIndexOpAlter; + +/** Cleaner task which alters the active streaming jobs of a disabled datasource. */ +@RequiredArgsConstructor +public class FlintStreamingJobCleanerTask implements Runnable { + + private final DataSourceService dataSourceService; + private final FlintIndexMetadataService flintIndexMetadataService; + private final StateStore stateStore; + private final EMRServerlessClientFactory emrServerlessClientFactory; + + private static final Logger LOGGER = LogManager.getLogger(FlintStreamingJobCleanerTask.class); + private static final AtomicBoolean isRunning = new AtomicBoolean(false); + + @Override + public void run() { + if (!isRunning.compareAndSet(false, true)) { + LOGGER.info("Previous task is still running. Skipping this execution."); + return; + } + try { + LOGGER.info("Starting the cleaner task for disabled data sources."); + List s3GlueDisabledDataSources = getS3GlueDisabledDataSources(); + LOGGER.info("Found {} disabled data sources to process.", s3GlueDisabledDataSources.size()); + for (DataSourceMetadata dataSourceMetadata : s3GlueDisabledDataSources) { + LOGGER.info("Processing disabled data source: {}", dataSourceMetadata.getName()); + Map autoRefreshFlintIndicesMap = + getAutoRefreshIndicesOfDataSource(dataSourceMetadata); + LOGGER.info( + "Found {} auto-refresh indices to alter for data source: {}", + autoRefreshFlintIndicesMap.size(), + dataSourceMetadata.getName()); + autoRefreshFlintIndicesMap.forEach( + (autoRefreshIndex, flintIndexMetadata) -> { + try { + LOGGER.debug("Attempting to alter index: {}", autoRefreshIndex); + FlintIndexOptions flintIndexOptions = new FlintIndexOptions(); + flintIndexOptions.setOption(FlintIndexOptions.AUTO_REFRESH, "false"); + FlintIndexOpAlter flintIndexOpAlter = + new FlintIndexOpAlter( + flintIndexOptions, + stateStore, + dataSourceMetadata.getName(), + emrServerlessClientFactory.getClient(), + flintIndexMetadataService); + flintIndexOpAlter.apply(flintIndexMetadata); + LOGGER.info("Successfully altered index: {}", autoRefreshIndex); + } catch (Exception exception) { + LOGGER.error( + "Failed to alter index {}: {}", + autoRefreshIndex, + exception.getMessage(), + exception); + Metrics.getInstance() + .getNumericalMetric(MetricName.STREAMING_JOB_CLEANER_TASK_FAILURE_COUNT) + .increment(); + } + }); + } + } catch (Throwable error) { + LOGGER.info("Error while running the streaming job cleaner task: {}", error.getMessage()); + } finally { + isRunning.set(false); + } + } + + private Map getAutoRefreshIndicesOfDataSource( + DataSourceMetadata dataSourceMetadata) { + Map flintIndexMetadataHashMap = + flintIndexMetadataService.getFlintIndexMetadata( + "flint_" + dataSourceMetadata.getName() + "_*"); + return flintIndexMetadataHashMap.entrySet().stream() + .filter(entry -> entry.getValue().getFlintIndexOptions().autoRefresh()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + private List getS3GlueDisabledDataSources() { + return this.dataSourceService.getDataSourceMetadata(false).stream() + .filter(dataSourceMetadata -> dataSourceMetadata.getConnector() == DataSourceType.S3GLUE) + .filter(dataSourceMetadata -> dataSourceMetadata.getStatus() == DataSourceStatus.DISABLED) + .collect(Collectors.toList()); + } +}