Skip to content

Commit

Permalink
FlintStreamingJobCleanerTask Implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <[email protected]>
  • Loading branch information
vmmusings committed Mar 19, 2024
1 parent 8374cb6 commit 6379aa8
Show file tree
Hide file tree
Showing 12 changed files with 426 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public enum MetricName {
EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT("emr_cancel_job_request_failure_count"),
EMR_STREAMING_QUERY_JOBS_CREATION_COUNT("emr_streaming_jobs_creation_count"),
EMR_INTERACTIVE_QUERY_JOBS_CREATION_COUNT("emr_interactive_jobs_creation_count"),
EMR_BATCH_QUERY_JOBS_CREATION_COUNT("emr_batch_jobs_creation_count");
EMR_BATCH_QUERY_JOBS_CREATION_COUNT("emr_batch_jobs_creation_count"),
STREAMING_JOB_CLEANER_TASK_FAILURE_COUNT("streaming_job_cleaner_task_failure_count");

private String name;

Expand Down Expand Up @@ -91,6 +92,7 @@ public static List<String> getNames() {
.add(ASYNC_QUERY_CREATE_API_REQUEST_COUNT)
.add(ASYNC_QUERY_GET_API_REQUEST_COUNT)
.add(ASYNC_QUERY_CANCEL_API_REQUEST_COUNT)
.add(STREAMING_JOB_CLEANER_TASK_FAILURE_COUNT)
.build();

public boolean isNumerical() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@
import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse;
import org.opensearch.sql.prometheus.storage.PrometheusStorageFactory;
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.cluster.ClusterManagerEventListener;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.sql.spark.rest.RestAsyncQueryManagementAction;
import org.opensearch.sql.spark.storage.SparkStorageFactory;
import org.opensearch.sql.spark.transport.TransportCancelAsyncQueryRequestAction;
Expand Down Expand Up @@ -221,7 +224,11 @@ public Collection<Object> createComponents(
OpenSearchSettings.SESSION_INDEX_TTL_SETTING,
OpenSearchSettings.RESULT_INDEX_TTL_SETTING,
OpenSearchSettings.AUTO_INDEX_MANAGEMENT_ENABLED_SETTING,
environment.settings());
environment.settings(),
dataSourceService,
injector.getInstance(FlintIndexMetadataService.class),
injector.getInstance(StateStore.class),
injector.getInstance(EMRServerlessClientFactory.class));
return ImmutableList.of(
dataSourceService,
injector.getInstance(AsyncQueryExecutorService.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,26 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.threadpool.Scheduler.Cancellable;
import org.opensearch.threadpool.ThreadPool;

public class ClusterManagerEventListener implements LocalNodeClusterManagerListener {

private Cancellable flintIndexRetentionCron;
private Cancellable flintStreamingJobCleanerCron;
private ClusterService clusterService;
private ThreadPool threadPool;
private Client client;
private Clock clock;
private DataSourceService dataSourceService;
private FlintIndexMetadataService flintIndexMetadataService;
private StateStore stateStore;
private EMRServerlessClientFactory emrServerlessClientFactory;
private Duration sessionTtlDuration;
private Duration resultTtlDuration;
private boolean isAutoIndexManagementEnabled;
Expand All @@ -42,13 +51,20 @@ public ClusterManagerEventListener(
Setting<TimeValue> sessionTtl,
Setting<TimeValue> resultTtl,
Setting<Boolean> isAutoIndexManagementEnabledSetting,
Settings settings) {
Settings settings,
DataSourceService dataSourceService,
FlintIndexMetadataService flintIndexMetadataService,
StateStore stateStore,
EMRServerlessClientFactory emrServerlessClientFactory) {
this.clusterService = clusterService;
this.threadPool = threadPool;
this.client = client;
this.clusterService.addLocalNodeClusterManagerListener(this);
this.clock = clock;

this.dataSourceService = dataSourceService;
this.flintIndexMetadataService = flintIndexMetadataService;
this.stateStore = stateStore;
this.emrServerlessClientFactory = emrServerlessClientFactory;
this.sessionTtlDuration = toDuration(sessionTtl.get(settings));
this.resultTtlDuration = toDuration(resultTtl.get(settings));

Expand Down Expand Up @@ -104,6 +120,19 @@ public void beforeStop() {
}
});
}
initializeStreamingJobCleanerCron();
}

private void initializeStreamingJobCleanerCron() {
flintStreamingJobCleanerCron =
threadPool.scheduleWithFixedDelay(
new FlintStreamingJobCleanerTask(
dataSourceService,
flintIndexMetadataService,
stateStore,
emrServerlessClientFactory),
TimeValue.timeValueMinutes(15),
executorName());
}

private void reInitializeFlintIndexRetention() {
Expand All @@ -125,6 +154,8 @@ private void reInitializeFlintIndexRetention() {
public void offClusterManager() {
cancel(flintIndexRetentionCron);
flintIndexRetentionCron = null;
cancel(flintStreamingJobCleanerCron);
flintStreamingJobCleanerCron = null;
}

private void cancel(Cancellable cron) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.cluster;

import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceStatus;
import org.opensearch.sql.datasources.exceptions.DataSourceNotFoundException;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.metrics.Metrics;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpAlter;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpCancel;

/** Cleaner task which alters the active streaming jobs of a disabled datasource. */
@RequiredArgsConstructor
public class FlintStreamingJobCleanerTask implements Runnable {

private final DataSourceService dataSourceService;
private final FlintIndexMetadataService flintIndexMetadataService;
private final StateStore stateStore;
private final EMRServerlessClientFactory emrServerlessClientFactory;

private static final Logger LOGGER = LogManager.getLogger(FlintStreamingJobCleanerTask.class);
protected static final AtomicBoolean isRunning = new AtomicBoolean(false);

@Override
public void run() {
if (!isRunning.compareAndSet(false, true)) {
LOGGER.info("Previous task is still running. Skipping this execution.");
return;
}
try {
LOGGER.info("Starting the cleaner task for disabled and deleted data sources.");
Map<String, FlintIndexMetadata> autoRefreshFlintIndicesMap = getAllAutoRefreshIndices();
autoRefreshFlintIndicesMap.forEach(
(autoRefreshIndex, flintIndexMetadata) -> {
try {
String datasourceName = getDataSourceName(flintIndexMetadata);
try {
DataSourceMetadata dataSourceMetadata =
this.dataSourceService.getDataSourceMetadata(datasourceName);
if (dataSourceMetadata.getStatus() == DataSourceStatus.DISABLED) {
alterAutoRefreshIndex(autoRefreshIndex, flintIndexMetadata, datasourceName);
}
} catch (DataSourceNotFoundException exception) {
// Datasource disabled.
vacuumAutoRefreshIndex(autoRefreshIndex, flintIndexMetadata, datasourceName);
}
} catch (Exception exception) {
LOGGER.error(
"Failed to alter/cancel index {}: {}",
autoRefreshIndex,
exception.getMessage(),
exception);
Metrics.getInstance()
.getNumericalMetric(MetricName.STREAMING_JOB_CLEANER_TASK_FAILURE_COUNT)
.increment();
}
});
} catch (Throwable error) {
LOGGER.info("Error while running the streaming job cleaner task: {}", error.getMessage());
Metrics.getInstance()
.getNumericalMetric(MetricName.STREAMING_JOB_CLEANER_TASK_FAILURE_COUNT)
.increment();
} finally {
isRunning.set(false);
}
}

private void vacuumAutoRefreshIndex(
String autoRefreshIndex, FlintIndexMetadata flintIndexMetadata, String datasourceName) {
// When the datasource is deleted. Possibly Replace with VACUUM Operation.
LOGGER.info("Attempting to cancel auto refresh index: {}", autoRefreshIndex);
FlintIndexOpCancel flintIndexOpCancel =
new FlintIndexOpCancel(stateStore, datasourceName, emrServerlessClientFactory.getClient());
flintIndexOpCancel.apply(flintIndexMetadata);
LOGGER.info("Successfully cancelled index: {}", autoRefreshIndex);
}

private void alterAutoRefreshIndex(
String autoRefreshIndex, FlintIndexMetadata flintIndexMetadata, String datasourceName) {
LOGGER.info("Attempting to alter index: {}", autoRefreshIndex);
FlintIndexOptions flintIndexOptions = new FlintIndexOptions();
flintIndexOptions.setOption(FlintIndexOptions.AUTO_REFRESH, "false");
FlintIndexOpAlter flintIndexOpAlter =
new FlintIndexOpAlter(
flintIndexOptions,
stateStore,
datasourceName,
emrServerlessClientFactory.getClient(),
flintIndexMetadataService);
flintIndexOpAlter.apply(flintIndexMetadata);
LOGGER.info("Successfully altered index: {}", autoRefreshIndex);
}

private String getDataSourceName(FlintIndexMetadata flintIndexMetadata) {
String kind = flintIndexMetadata.getKind();
switch (kind) {
case "mv":
return flintIndexMetadata.getName().split("\\.")[0];
case "skipping":
case "covering":
return flintIndexMetadata.getSource().split("\\.")[0];
default:
throw new IllegalArgumentException("Unknown flint index kind.");
}
}

private Map<String, FlintIndexMetadata> getAllAutoRefreshIndices() {
Map<String, FlintIndexMetadata> flintIndexMetadataHashMap =
flintIndexMetadataService.getFlintIndexMetadata("flint_*");
return flintIndexMetadataHashMap.entrySet().stream()
.filter(entry -> entry.getValue().getFlintIndexOptions().autoRefresh())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ public class FlintIndexMetadata {
private final String jobId;
private final String appId;
private final String latestId;
private final String kind;
private final String source;
private final String name;
private final FlintIndexOptions flintIndexOptions;

public Optional<String> getLatestId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@
import static org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions.WATERMARK_DELAY;
import static org.opensearch.sql.spark.flint.FlintIndexMetadata.APP_ID;
import static org.opensearch.sql.spark.flint.FlintIndexMetadata.ENV_KEY;
import static org.opensearch.sql.spark.flint.FlintIndexMetadata.KIND_KEY;
import static org.opensearch.sql.spark.flint.FlintIndexMetadata.LATEST_ID_KEY;
import static org.opensearch.sql.spark.flint.FlintIndexMetadata.META_KEY;
import static org.opensearch.sql.spark.flint.FlintIndexMetadata.NAME_KEY;
import static org.opensearch.sql.spark.flint.FlintIndexMetadata.OPTIONS_KEY;
import static org.opensearch.sql.spark.flint.FlintIndexMetadata.PROPERTIES_KEY;
import static org.opensearch.sql.spark.flint.FlintIndexMetadata.SERVERLESS_EMR_JOB_ID;
import static org.opensearch.sql.spark.flint.FlintIndexMetadata.SOURCE_KEY;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -149,9 +152,15 @@ private FlintIndexMetadata fromMetadata(String indexName, Map<String, Object> me
String jobId = (String) envMap.get(SERVERLESS_EMR_JOB_ID);
String appId = (String) envMap.getOrDefault(APP_ID, null);
String latestId = (String) metaMap.getOrDefault(LATEST_ID_KEY, null);
String kind = (String) metaMap.getOrDefault(KIND_KEY, null);
String name = (String) metaMap.getOrDefault(NAME_KEY, null);
String source = (String) metaMap.getOrDefault(SOURCE_KEY, null);
flintIndexMetadataBuilder.jobId(jobId);
flintIndexMetadataBuilder.appId(appId);
flintIndexMetadataBuilder.latestId(latestId);
flintIndexMetadataBuilder.name(name);
flintIndexMetadataBuilder.kind(kind);
flintIndexMetadataBuilder.source(source);
flintIndexMetadataBuilder.opensearchIndexName(indexName);
flintIndexMetadataBuilder.flintIndexOptions(flintIndexOptions);
return flintIndexMetadataBuilder.build();
Expand Down
Loading

0 comments on commit 6379aa8

Please sign in to comment.