Skip to content

Commit

Permalink
FlintStreamingJobCleanerTask Implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <[email protected]>
  • Loading branch information
vamsimanohar committed Mar 18, 2024
1 parent 4319e3f commit 49ed074
Show file tree
Hide file tree
Showing 6 changed files with 217 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public enum MetricName {
EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT("emr_cancel_job_request_failure_count"),
EMR_STREAMING_QUERY_JOBS_CREATION_COUNT("emr_streaming_jobs_creation_count"),
EMR_INTERACTIVE_QUERY_JOBS_CREATION_COUNT("emr_interactive_jobs_creation_count"),
EMR_BATCH_QUERY_JOBS_CREATION_COUNT("emr_batch_jobs_creation_count");
EMR_BATCH_QUERY_JOBS_CREATION_COUNT("emr_batch_jobs_creation_count"),
STREAMING_JOB_CLEANER_TASK_FAILURE_COUNT("streaming_job_cleaner_task_failure_count");

private String name;

Expand Down Expand Up @@ -91,6 +92,7 @@ public static List<String> getNames() {
.add(ASYNC_QUERY_CREATE_API_REQUEST_COUNT)
.add(ASYNC_QUERY_GET_API_REQUEST_COUNT)
.add(ASYNC_QUERY_CANCEL_API_REQUEST_COUNT)
.add(STREAMING_JOB_CLEANER_TASK_FAILURE_COUNT)
.build();

public boolean isNumerical() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@
import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse;
import org.opensearch.sql.prometheus.storage.PrometheusStorageFactory;
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.cluster.ClusterManagerEventListener;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.sql.spark.rest.RestAsyncQueryManagementAction;
import org.opensearch.sql.spark.storage.SparkStorageFactory;
import org.opensearch.sql.spark.transport.TransportCancelAsyncQueryRequestAction;
Expand Down Expand Up @@ -221,7 +224,11 @@ public Collection<Object> createComponents(
OpenSearchSettings.SESSION_INDEX_TTL_SETTING,
OpenSearchSettings.RESULT_INDEX_TTL_SETTING,
OpenSearchSettings.AUTO_INDEX_MANAGEMENT_ENABLED_SETTING,
environment.settings());
environment.settings(),
dataSourceService,
injector.getInstance(FlintIndexMetadataService.class),
injector.getInstance(StateStore.class),
injector.getInstance(EMRServerlessClientFactory.class));
return ImmutableList.of(
dataSourceService,
injector.getInstance(AsyncQueryExecutorService.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,26 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.threadpool.Scheduler.Cancellable;
import org.opensearch.threadpool.ThreadPool;

public class ClusterManagerEventListener implements LocalNodeClusterManagerListener {

private Cancellable flintIndexRetentionCron;
private Cancellable flintStreamingJobCleanerCron;
private ClusterService clusterService;
private ThreadPool threadPool;
private Client client;
private Clock clock;
private DataSourceService dataSourceService;
private FlintIndexMetadataService flintIndexMetadataService;
private StateStore stateStore;
private EMRServerlessClientFactory emrServerlessClientFactory;
private Duration sessionTtlDuration;
private Duration resultTtlDuration;
private boolean isAutoIndexManagementEnabled;
Expand All @@ -42,13 +51,20 @@ public ClusterManagerEventListener(
Setting<TimeValue> sessionTtl,
Setting<TimeValue> resultTtl,
Setting<Boolean> isAutoIndexManagementEnabledSetting,
Settings settings) {
Settings settings,
DataSourceService dataSourceService,
FlintIndexMetadataService flintIndexMetadataService,
StateStore stateStore,
EMRServerlessClientFactory emrServerlessClientFactory) {
this.clusterService = clusterService;
this.threadPool = threadPool;
this.client = client;
this.clusterService.addLocalNodeClusterManagerListener(this);
this.clock = clock;

this.dataSourceService = dataSourceService;
this.flintIndexMetadataService = flintIndexMetadataService;
this.stateStore = stateStore;
this.emrServerlessClientFactory = emrServerlessClientFactory;
this.sessionTtlDuration = toDuration(sessionTtl.get(settings));
this.resultTtlDuration = toDuration(resultTtl.get(settings));

Expand Down Expand Up @@ -104,6 +120,19 @@ public void beforeStop() {
}
});
}
initializeStreamingJobCleanerCron();
}

private void initializeStreamingJobCleanerCron() {
flintStreamingJobCleanerCron =
threadPool.scheduleWithFixedDelay(
new FlintStreamingJobCleanerTask(
dataSourceService,
flintIndexMetadataService,
stateStore,
emrServerlessClientFactory),
TimeValue.timeValueMinutes(15),
executorName());
}

private void reInitializeFlintIndexRetention() {
Expand All @@ -125,6 +154,8 @@ private void reInitializeFlintIndexRetention() {
public void offClusterManager() {
cancel(flintIndexRetentionCron);
flintIndexRetentionCron = null;
cancel(flintStreamingJobCleanerCron);
flintStreamingJobCleanerCron = null;
}

private void cancel(Cancellable cron) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.cluster;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceStatus;
import org.opensearch.sql.datasource.model.DataSourceType;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.metrics.Metrics;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpAlter;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpCancel;

/** Cleaner task which alters the active streaming jobs of a disabled datasource. */
@RequiredArgsConstructor
public class FlintStreamingJobCleanerTask implements Runnable {

private final DataSourceService dataSourceService;
private final FlintIndexMetadataService flintIndexMetadataService;
private final StateStore stateStore;
private final EMRServerlessClientFactory emrServerlessClientFactory;

private static final Logger LOGGER = LogManager.getLogger(FlintStreamingJobCleanerTask.class);
protected static final AtomicBoolean isRunning = new AtomicBoolean(false);

@Override
public void run() {
if (!isRunning.compareAndSet(false, true)) {
LOGGER.info("Previous task is still running. Skipping this execution.");
return;
}
try {
LOGGER.info("Starting the cleaner task for disabled and deleted data sources.");
List<DataSourceMetadata> s3GlueDisabledDataSources = getS3GlueDataSources();
Set<String> disabledS3DataSources =
s3GlueDisabledDataSources.stream()
.filter(
dataSourceMetadata -> dataSourceMetadata.getStatus() == DataSourceStatus.DISABLED)
.map(DataSourceMetadata::getName)
.collect(Collectors.toSet());
Set<String> allS3DataSources =
s3GlueDisabledDataSources.stream()
.map(DataSourceMetadata::getName)
.collect(Collectors.toSet());
Map<String, FlintIndexMetadata> autoRefreshFlintIndicesMap = getAllAutoRefreshIndices();
autoRefreshFlintIndicesMap.forEach(
(autoRefreshIndex, flintIndexMetadata) -> {
try {
String datasourceName = getDataSourceName(autoRefreshIndex);
if (disabledS3DataSources.contains(datasourceName)) {
LOGGER.debug("Attempting to alter index: {}", autoRefreshIndex);
FlintIndexOptions flintIndexOptions = new FlintIndexOptions();
flintIndexOptions.setOption(FlintIndexOptions.AUTO_REFRESH, "false");
FlintIndexOpAlter flintIndexOpAlter =
new FlintIndexOpAlter(
flintIndexOptions,
stateStore,
datasourceName,
emrServerlessClientFactory.getClient(),
flintIndexMetadataService);
flintIndexOpAlter.apply(flintIndexMetadata);
LOGGER.info("Successfully altered index: {}", autoRefreshIndex);
} else if (!allS3DataSources.contains(datasourceName)) {
//Possibly Replace with VACCUM Operation.
LOGGER.debug("Attempting to cancel auto refresh index: {}", autoRefreshIndex);
FlintIndexOpCancel flintIndexOpCancel =
new FlintIndexOpCancel(
stateStore, datasourceName, emrServerlessClientFactory.getClient());
flintIndexOpCancel.apply(flintIndexMetadata);
LOGGER.info("Successfully cancelled index: {}", autoRefreshIndex);
}
} catch (Exception exception) {
LOGGER.error(
"Failed to alter/cancel index {}: {}",
autoRefreshIndex,
exception.getMessage(),
exception);
Metrics.getInstance()
.getNumericalMetric(MetricName.STREAMING_JOB_CLEANER_TASK_FAILURE_COUNT)
.increment();
}
});
} catch (Throwable error) {
LOGGER.info("Error while running the streaming job cleaner task: {}", error.getMessage());
} finally {
isRunning.set(false);
}
}

private String getDataSourceName(String autoRefreshIndex) {
String[] split = autoRefreshIndex.split("_");
return split.length > 1 ? split[1] : StringUtils.EMPTY;
}

private Map<String, FlintIndexMetadata> getAllAutoRefreshIndices() {
Map<String, FlintIndexMetadata> flintIndexMetadataHashMap =
flintIndexMetadataService.getFlintIndexMetadata("flint_*");
return flintIndexMetadataHashMap.entrySet().stream()
.filter(entry -> entry.getValue().getFlintIndexOptions().autoRefresh())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

private List<DataSourceMetadata> getS3GlueDataSources() {
return this.dataSourceService.getDataSourceMetadata(false).stream()
.filter(dataSourceMetadata -> dataSourceMetadata.getConnector() == DataSourceType.S3GLUE)
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@

public class AsyncQueryExecutorServiceSpec extends OpenSearchIntegTestCase {
public static final String DATASOURCE = "mys3";
public static final String DSOTHER = "mytest";
public static final String DSOTHER = "my_glue";

protected ClusterService clusterService;
protected org.opensearch.sql.common.setting.Settings pluginSettings;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package org.opensearch.sql.spark.cluster;

import com.google.common.collect.ImmutableMap;
import org.junit.Before;
import org.junit.Test;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceType;
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorServiceSpec;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;

public class FlintStreamingJobCleanerTaskTest extends AsyncQueryExecutorServiceSpec {

@Before
@Override
public void setup() {
super.setup();
// Having other type just for extra datasource.
DataSourceMetadata prometheusDataSource =
new DataSourceMetadata.Builder()
.setName("prometheus")
.setConnector(DataSourceType.PROMETHEUS)
.setProperties(ImmutableMap.of("prometheus.uri", "http://localhost:9080"))
.build();
dataSourceService.createDataSource(prometheusDataSource);
}

@Test
public void testStreamingJobCleanerWhenDataSourceDisabled() {
LocalEMRSClient localEMRSClient = new LocalEMRSClient();
EMRServerlessClientFactory emrServerlessClientFactory = () -> localEMRSClient;

FlintStreamingJobCleanerTask flintStreamingJobCleanerTask
= new FlintStreamingJobCleanerTask(dataSourceService , stateStore, emrServerlessClientFactory)
}

@Test
public void testMultipleStreamingJobCleanerWhenDataSourceDisabled() {}

@Test
public void testStreamingJobClearnerWhenDataSourceIsDeleted() {}

@Test
public void testStreamingJobCleanerWhenDataSourceIsNeitherDisabledNorDeleted() {}

@Test
public void testStreamingJobClearnerWhenADatasourceIsDeletedAndAnotherIsDisabled() {}
}

0 comments on commit 49ed074

Please sign in to comment.