Skip to content

Commit

Permalink
FlintStreamingJobCleanerTask Implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <[email protected]>
  • Loading branch information
vamsimanohar committed Mar 19, 2024
1 parent 8374cb6 commit bc8edb4
Show file tree
Hide file tree
Showing 17 changed files with 940 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ public enum Key {
"plugins.query.executionengine.spark.session_inactivity_timeout_millis"),

/** Async query Settings * */
ASYNC_QUERY_ENABLED("plugins.query.executionengine.async_query.enabled");
ASYNC_QUERY_ENABLED("plugins.query.executionengine.async_query.enabled"),
STREAMING_JOB_HOUSEKEEPER_INTERVAL(
"plugins.query.executionengine.spark.streamingjobs.housekeeper.interval");

@Getter private final String keyValue;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ public void updateDataSourceMetadata(DataSourceMetadata dataSourceMetadata) {
public void deleteDataSourceMetadata(String datasourceName) {
DeleteRequest deleteRequest = new DeleteRequest(DATASOURCE_INDEX_NAME);
deleteRequest.id(datasourceName);
deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
ActionFuture<DeleteResponse> deleteResponseActionFuture;
try (ThreadContext.StoredContext storedContext =
client.threadPool().getThreadContext().stashContext()) {
Expand Down
35 changes: 35 additions & 0 deletions docs/user/admin/settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -595,3 +595,38 @@ Request::
}
}

plugins.query.executionengine.spark.streamingjobs.housekeeper.interval
===============================

Description
-----------
This setting specifies the interval at which the streaming job housekeeper runs to clean up streaming jobs associated with deleted and disabled data sources.
The default configuration executes this cleanup every 15 minutes.

* Default Value: 15 minutes

To modify the TTL to 30 minutes for example, use this command:

Request ::

sh$ curl -sS -H 'Content-Type: application/json' -X PUT localhost:9200/_cluster/settings \
... -d '{"transient":{"plugins.query.executionengine.spark.streamingjobs.housekeeper.interval":"30m"}}'
{
"acknowledged": true,
"persistent": {},
"transient": {
"plugins": {
"query": {
"executionengine": {
"spark": {
"streamingjobs": {
"housekeeper": {
"interval": "30m"
}
}
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public enum MetricName {
EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT("emr_cancel_job_request_failure_count"),
EMR_STREAMING_QUERY_JOBS_CREATION_COUNT("emr_streaming_jobs_creation_count"),
EMR_INTERACTIVE_QUERY_JOBS_CREATION_COUNT("emr_interactive_jobs_creation_count"),
EMR_BATCH_QUERY_JOBS_CREATION_COUNT("emr_batch_jobs_creation_count");
EMR_BATCH_QUERY_JOBS_CREATION_COUNT("emr_batch_jobs_creation_count"),
STREAMING_JOB_HOUSEKEEPER_TASK_FAILURE_COUNT("streaming_job_housekeeper_task_failure_count");

private String name;

Expand Down Expand Up @@ -91,6 +92,7 @@ public static List<String> getNames() {
.add(ASYNC_QUERY_CREATE_API_REQUEST_COUNT)
.add(ASYNC_QUERY_GET_API_REQUEST_COUNT)
.add(ASYNC_QUERY_CANCEL_API_REQUEST_COUNT)
.add(STREAMING_JOB_HOUSEKEEPER_TASK_FAILURE_COUNT)
.build();

public boolean isNumerical() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static org.opensearch.common.settings.Settings.EMPTY;
import static org.opensearch.common.unit.TimeValue.timeValueDays;
import static org.opensearch.common.unit.TimeValue.timeValueMinutes;
import static org.opensearch.sql.common.setting.Settings.Key.ENCYRPTION_MASTER_KEY;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -193,6 +194,13 @@ public class OpenSearchSettings extends Settings {
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<TimeValue> STREAMING_JOB_HOUSEKEEPER_INTERVAL_SETTING =
Setting.positiveTimeSetting(
Key.STREAMING_JOB_HOUSEKEEPER_INTERVAL.getKeyValue(),
timeValueMinutes(15),
Setting.Property.NodeScope,
Setting.Property.Dynamic);

/** Construct OpenSearchSetting. The OpenSearchSetting must be singleton. */
@SuppressWarnings("unchecked")
public OpenSearchSettings(ClusterSettings clusterSettings) {
Expand Down Expand Up @@ -313,6 +321,12 @@ public OpenSearchSettings(ClusterSettings clusterSettings) {
Key.SESSION_INACTIVITY_TIMEOUT_MILLIS,
SESSION_INACTIVITY_TIMEOUT_MILLIS_SETTING,
new Updater((Key.SESSION_INACTIVITY_TIMEOUT_MILLIS)));
register(
settingBuilder,
clusterSettings,
Key.STREAMING_JOB_HOUSEKEEPER_INTERVAL,
STREAMING_JOB_HOUSEKEEPER_INTERVAL_SETTING,
new Updater((Key.STREAMING_JOB_HOUSEKEEPER_INTERVAL)));
defaultSettings = settingBuilder.build();
}

Expand Down
10 changes: 9 additions & 1 deletion plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@
import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse;
import org.opensearch.sql.prometheus.storage.PrometheusStorageFactory;
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.cluster.ClusterManagerEventListener;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadataServiceImpl;
import org.opensearch.sql.spark.rest.RestAsyncQueryManagementAction;
import org.opensearch.sql.spark.storage.SparkStorageFactory;
import org.opensearch.sql.spark.transport.TransportCancelAsyncQueryRequestAction;
Expand Down Expand Up @@ -220,8 +223,13 @@ public Collection<Object> createComponents(
Clock.systemUTC(),
OpenSearchSettings.SESSION_INDEX_TTL_SETTING,
OpenSearchSettings.RESULT_INDEX_TTL_SETTING,
OpenSearchSettings.STREAMING_JOB_HOUSEKEEPER_INTERVAL_SETTING,
OpenSearchSettings.AUTO_INDEX_MANAGEMENT_ENABLED_SETTING,
environment.settings());
environment.settings(),
dataSourceService,
injector.getInstance(FlintIndexMetadataServiceImpl.class),
injector.getInstance(StateStore.class),
injector.getInstance(EMRServerlessClientFactory.class));
return ImmutableList.of(
dataSourceService,
injector.getInstance(AsyncQueryExecutorService.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,29 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.threadpool.Scheduler.Cancellable;
import org.opensearch.threadpool.ThreadPool;

public class ClusterManagerEventListener implements LocalNodeClusterManagerListener {

private Cancellable flintIndexRetentionCron;
private Cancellable flintStreamingJobHouseKeeperCron;
private ClusterService clusterService;
private ThreadPool threadPool;
private Client client;
private Clock clock;
private DataSourceService dataSourceService;
private FlintIndexMetadataService flintIndexMetadataService;
private StateStore stateStore;
private EMRServerlessClientFactory emrServerlessClientFactory;
private Duration sessionTtlDuration;
private Duration resultTtlDuration;
private TimeValue streamingJobHouseKeepingInterval;
private boolean isAutoIndexManagementEnabled;

public ClusterManagerEventListener(
Expand All @@ -41,16 +51,25 @@ public ClusterManagerEventListener(
Clock clock,
Setting<TimeValue> sessionTtl,
Setting<TimeValue> resultTtl,
Setting<TimeValue> streamingJobHouseKeepingInterval,
Setting<Boolean> isAutoIndexManagementEnabledSetting,
Settings settings) {
Settings settings,
DataSourceService dataSourceService,
FlintIndexMetadataService flintIndexMetadataService,
StateStore stateStore,
EMRServerlessClientFactory emrServerlessClientFactory) {
this.clusterService = clusterService;
this.threadPool = threadPool;
this.client = client;
this.clusterService.addLocalNodeClusterManagerListener(this);
this.clock = clock;

this.dataSourceService = dataSourceService;
this.flintIndexMetadataService = flintIndexMetadataService;
this.stateStore = stateStore;
this.emrServerlessClientFactory = emrServerlessClientFactory;
this.sessionTtlDuration = toDuration(sessionTtl.get(settings));
this.resultTtlDuration = toDuration(resultTtl.get(settings));
this.streamingJobHouseKeepingInterval = streamingJobHouseKeepingInterval.get(settings);

clusterService
.getClusterSettings()
Expand Down Expand Up @@ -87,6 +106,16 @@ public ClusterManagerEventListener(
}
}
});

clusterService
.getClusterSettings()
.addSettingsUpdateConsumer(
streamingJobHouseKeepingInterval,
it -> {
this.streamingJobHouseKeepingInterval = it;
cancel(flintStreamingJobHouseKeeperCron);
initializeStreamingJobHouseKeeperCron();
});
}

@Override
Expand All @@ -104,6 +133,19 @@ public void beforeStop() {
}
});
}
initializeStreamingJobHouseKeeperCron();
}

private void initializeStreamingJobHouseKeeperCron() {
flintStreamingJobHouseKeeperCron =
threadPool.scheduleWithFixedDelay(
new FlintStreamingJobHouseKeeperTask(
dataSourceService,
flintIndexMetadataService,
stateStore,
emrServerlessClientFactory),
streamingJobHouseKeepingInterval,
executorName());
}

private void reInitializeFlintIndexRetention() {
Expand All @@ -125,6 +167,8 @@ private void reInitializeFlintIndexRetention() {
public void offClusterManager() {
cancel(flintIndexRetentionCron);
flintIndexRetentionCron = null;
cancel(flintStreamingJobHouseKeeperCron);
flintStreamingJobHouseKeeperCron = null;
}

private void cancel(Cancellable cron) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.cluster;

import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceStatus;
import org.opensearch.sql.datasources.exceptions.DataSourceNotFoundException;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.metrics.Metrics;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpAlter;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpDrop;

/** Cleaner task which alters the active streaming jobs of a disabled datasource. */
@RequiredArgsConstructor
public class FlintStreamingJobHouseKeeperTask implements Runnable {

private final DataSourceService dataSourceService;
private final FlintIndexMetadataService flintIndexMetadataService;
private final StateStore stateStore;
private final EMRServerlessClientFactory emrServerlessClientFactory;

private static final Logger LOGGER = LogManager.getLogger(FlintStreamingJobHouseKeeperTask.class);
protected static final AtomicBoolean isRunning = new AtomicBoolean(false);

@Override
public void run() {
if (!isRunning.compareAndSet(false, true)) {
LOGGER.info("Previous task is still running. Skipping this execution.");
return;
}
try {
LOGGER.info("Starting the cleaner task for disabled and deleted data sources.");
Map<String, FlintIndexMetadata> autoRefreshFlintIndicesMap = getAllAutoRefreshIndices();
autoRefreshFlintIndicesMap.forEach(
(autoRefreshIndex, flintIndexMetadata) -> {
try {
String datasourceName = getDataSourceName(flintIndexMetadata);
try {
DataSourceMetadata dataSourceMetadata =
this.dataSourceService.getDataSourceMetadata(datasourceName);
if (dataSourceMetadata.getStatus() == DataSourceStatus.DISABLED) {
LOGGER.info("Datasource is disabled for autoRefreshIndex: {}", autoRefreshIndex);
alterAutoRefreshIndex(autoRefreshIndex, flintIndexMetadata, datasourceName);
} else {
LOGGER.debug("Datasource is enabled for autoRefreshIndex : {}", autoRefreshIndex);
}
} catch (DataSourceNotFoundException exception) {
// Datasource disabled.
LOGGER.info("Datasource is deleted for autoRefreshIndex: {}", autoRefreshIndex);
dropAutoRefreshIndex(autoRefreshIndex, flintIndexMetadata, datasourceName);
}
} catch (Exception exception) {
LOGGER.error(
"Failed to alter/cancel index {}: {}",
autoRefreshIndex,
exception.getMessage(),
exception);
Metrics.getInstance()
.getNumericalMetric(MetricName.STREAMING_JOB_HOUSEKEEPER_TASK_FAILURE_COUNT)
.increment();
}
});
} catch (Throwable error) {
LOGGER.error("Error while running the streaming job cleaner task: {}", error.getMessage());
Metrics.getInstance()
.getNumericalMetric(MetricName.STREAMING_JOB_HOUSEKEEPER_TASK_FAILURE_COUNT)
.increment();
} finally {
isRunning.set(false);
}
}

private void dropAutoRefreshIndex(
String autoRefreshIndex, FlintIndexMetadata flintIndexMetadata, String datasourceName) {
// When the datasource is deleted. Possibly Replace with VACUUM Operation.
LOGGER.info("Attempting to drop auto refresh index: {}", autoRefreshIndex);
FlintIndexOpDrop flintIndexOpDrop =
new FlintIndexOpDrop(stateStore, datasourceName, emrServerlessClientFactory.getClient());
flintIndexOpDrop.apply(flintIndexMetadata);
LOGGER.info("Successfully dropped index: {}", autoRefreshIndex);
}

private void alterAutoRefreshIndex(
String autoRefreshIndex, FlintIndexMetadata flintIndexMetadata, String datasourceName) {
LOGGER.info("Attempting to alter index: {}", autoRefreshIndex);
FlintIndexOptions flintIndexOptions = new FlintIndexOptions();
flintIndexOptions.setOption(FlintIndexOptions.AUTO_REFRESH, "false");
FlintIndexOpAlter flintIndexOpAlter =
new FlintIndexOpAlter(
flintIndexOptions,
stateStore,
datasourceName,
emrServerlessClientFactory.getClient(),
flintIndexMetadataService);
flintIndexOpAlter.apply(flintIndexMetadata);
LOGGER.info("Successfully altered index: {}", autoRefreshIndex);
}

private String getDataSourceName(FlintIndexMetadata flintIndexMetadata) {
String kind = flintIndexMetadata.getKind();
switch (kind) {
case "mv":
return flintIndexMetadata.getName().split("\\.")[0];
case "skipping":
case "covering":
return flintIndexMetadata.getSource().split("\\.")[0];
default:
throw new IllegalArgumentException(String.format("Unknown flint index kind: %s", kind));
}
}

private Map<String, FlintIndexMetadata> getAllAutoRefreshIndices() {
Map<String, FlintIndexMetadata> flintIndexMetadataHashMap =
flintIndexMetadataService.getFlintIndexMetadata("flint_*");
return flintIndexMetadataHashMap.entrySet().stream()
.filter(entry -> entry.getValue().getFlintIndexOptions().autoRefresh())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ public class FlintIndexMetadata {
private final String jobId;
private final String appId;
private final String latestId;
private final String kind;
private final String source;
private final String name;
private final FlintIndexOptions flintIndexOptions;

public Optional<String> getLatestId() {
Expand Down
Loading

0 comments on commit bc8edb4

Please sign in to comment.