Skip to content

Commit

Permalink
Top N indices auto deletion config & functionality
Browse files Browse the repository at this point in the history
Signed-off-by: David Zane <[email protected]>
  • Loading branch information
dzane17 committed Dec 19, 2024
1 parent a97099c commit 4299935
Show file tree
Hide file tree
Showing 11 changed files with 276 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public Collection<Object> createComponents(
OperationalMetricsCounter.initialize(clusterService.getClusterName().toString(), metricsRegistry);
// create top n queries service
final QueryInsightsService queryInsightsService = new QueryInsightsService(
clusterService.getClusterSettings(),
clusterService,
threadPool,
client,
metricsRegistry,
Expand Down Expand Up @@ -145,6 +145,7 @@ public List<Setting<?>> getSettings() {
QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_NAME,
QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_TYPE,
QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING,
QueryInsightsSettings.TOP_N_EXPORTER_DELETE_AFTER,
QueryCategorizationSettings.SEARCH_QUERY_FIELD_TYPE_CACHE_SIZE_KEY
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,22 @@

package org.opensearch.plugin.insights.core.exporter;

import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_DELETE_AFTER_VALUE;

import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormatter;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.bulk.BulkRequestBuilder;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.action.ActionListener;
Expand All @@ -36,6 +42,7 @@ public final class LocalIndexExporter implements QueryInsightsExporter {
private final Logger logger = LogManager.getLogger();
private final Client client;
private DateTimeFormatter indexPattern;
private int deleteAfter;

/**
* Constructor of LocalIndexExporter
Expand All @@ -46,6 +53,7 @@ public final class LocalIndexExporter implements QueryInsightsExporter {
public LocalIndexExporter(final Client client, final DateTimeFormatter indexPattern) {
this.indexPattern = indexPattern;
this.client = client;
this.deleteAfter = DEFAULT_DELETE_AFTER_VALUE;
}

/**
Expand All @@ -61,11 +69,9 @@ public DateTimeFormatter getIndexPattern() {
* Setter of indexPattern
*
* @param indexPattern index pattern
* @return the current LocalIndexExporter
*/
public LocalIndexExporter setIndexPattern(DateTimeFormatter indexPattern) {
void setIndexPattern(DateTimeFormatter indexPattern) {
this.indexPattern = indexPattern;
return this;
}

/**
Expand Down Expand Up @@ -113,4 +119,49 @@ public void close() {
private String getDateTimeFromFormat() {
return indexPattern.print(DateTime.now(DateTimeZone.UTC));
}

/**
* Set local index exporter data retention period
*
* @param deleteAfter the number of days after which Top N local indices should be deleted
*/
public void setDeleteAfter(final int deleteAfter) {
this.deleteAfter = deleteAfter;
}

/**
* Delete Top N local indices older than the configured data retention period
*
* @param indexMetadataMap Map of index name {@link String} to {@link IndexMetadata}
*/
public void deleteExpiredIndices(final Map<String, IndexMetadata> indexMetadataMap) {
long expirationMillisLong = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(deleteAfter);
for (Map.Entry<String, IndexMetadata> entry : indexMetadataMap.entrySet()) {
String indexName = entry.getKey();
if (!matchesPattern(indexName, indexPattern)) {
continue;
}
if (entry.getValue().getCreationDate() <= expirationMillisLong) {
// delete this index
client.admin().indices().delete(new DeleteIndexRequest(indexName));
}
}
}

/**
* Checks if the input string matches the given DateTimeFormatter pattern.
*
* @param input The input string to check.
* @param formatter The DateTimeFormatter to validate the string against.
* @return true if the string matches the pattern, false otherwise.
*/
static boolean matchesPattern(final String input, final DateTimeFormatter formatter) {
try {
// Try parsing the input with the given formatter
formatter.parseDateTime(input);
return true; // String matches the pattern
} catch (Exception e) {
return false; // String does not match the pattern
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
public enum OperationalMetric {
LOCAL_INDEX_READER_PARSING_EXCEPTIONS("Number of errors when parsing with LocalIndexReader"),
LOCAL_INDEX_EXPORTER_BULK_FAILURES("Number of failures when ingesting Query Insights data to local indices"),
LOCAL_INDEX_EXPORTER_DELETE_FAILURES("Number of failures when deleting local indices"),
LOCAL_INDEX_EXPORTER_EXCEPTIONS("Number of exceptions in Query Insights LocalIndexExporter"),
INVALID_EXPORTER_TYPE_FAILURES("Number of invalid exporter type failures"),
INVALID_INDEX_PATTERN_EXCEPTIONS("Number of invalid index pattern exceptions"),
DATA_INGEST_EXCEPTIONS("Number of exceptions during data ingest in Query Insights"),
QUERY_CATEGORIZE_EXCEPTIONS("Number of exceptions when categorizing the queries"),
EXPORTER_FAIL_TO_CLOSE_EXCEPTION("Number of failures when closing the exporter"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_GROUPING_TYPE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_EXPORTER_DELETE_AFTER;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getExporterSettings;

import java.io.IOException;
Expand All @@ -19,13 +20,14 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.xcontent.NamedXContentRegistry;
Expand All @@ -52,13 +54,15 @@ public class QueryInsightsService extends AbstractLifecycleComponent {

private static final Logger logger = LogManager.getLogger(QueryInsightsService.class);

private final ClusterService clusterService;

/**
* The internal OpenSearch thread pool that execute async processing and exporting tasks
*/
private final ThreadPool threadPool;

/**
* Services to capture top n queries for different metric types
* Map of {@link MetricType} to associated {@link TopQueriesService}
*/
private final Map<MetricType, TopQueriesService> topQueriesServices;

Expand All @@ -73,10 +77,10 @@ public class QueryInsightsService extends AbstractLifecycleComponent {
private final LinkedBlockingQueue<SearchQueryRecord> queryRecordsQueue;

/**
* Holds a reference to delayed operation {@link Scheduler.Cancellable} so it can be cancelled when
* List of references to delayed operations {@link Scheduler.Cancellable} so they can be cancelled when
* the service closed concurrently.
*/
protected volatile Scheduler.Cancellable scheduledFuture;
protected volatile List<Scheduler.Cancellable> scheduledFutures;

/**
* Query Insights exporter factory
Expand All @@ -102,20 +106,21 @@ public class QueryInsightsService extends AbstractLifecycleComponent {
/**
* Constructor of the QueryInsightsService
*
* @param clusterSettings OpenSearch cluster level settings
* @param clusterService OpenSearch cluster service
* @param threadPool The OpenSearch thread pool to run async tasks
* @param client OS client
* @param metricsRegistry Opentelemetry Metrics registry
* @param namedXContentRegistry NamedXContentRegistry for parsing purposes
*/
@Inject
public QueryInsightsService(
final ClusterSettings clusterSettings,
final ClusterService clusterService,
final ThreadPool threadPool,
final Client client,
final MetricsRegistry metricsRegistry,
final NamedXContentRegistry namedXContentRegistry
) {
this.clusterService = clusterService;
enableCollect = new HashMap<>();
queryRecordsQueue = new LinkedBlockingQueue<>(QueryInsightsSettings.QUERY_RECORD_QUEUE_CAPACITY);
this.threadPool = threadPool;
Expand All @@ -132,11 +137,18 @@ public QueryInsightsService(
);
}
for (MetricType type : MetricType.allMetricTypes()) {
clusterSettings.addSettingsUpdateConsumer(
getExporterSettings(type),
(settings -> setExporterAndReader(type, settings)),
(settings -> validateExporterAndReaderConfig(type, settings))
);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
getExporterSettings(type),
(settings -> setExporterAndReader(type, settings)),
(settings -> validateExporterAndReaderConfig(type, settings))
);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
TOP_N_EXPORTER_DELETE_AFTER,
(settings -> setExporterDeleteAfter(type, settings)),
(TopQueriesService::validateExporterDeleteAfter)
);
}

this.searchQueryCategorizer = SearchQueryCategorizer.getInstance(metricsRegistry);
Expand Down Expand Up @@ -389,14 +401,26 @@ public void setTopNSize(final MetricType type, final int topNSize) {
* @param type {@link MetricType}
* @param settings exporter and reader settings
*/
public void setExporterAndReader(final MetricType type, final Settings settings) {
private void setExporterAndReader(final MetricType type, final Settings settings) {
if (topQueriesServices.containsKey(type)) {
TopQueriesService tqs = topQueriesServices.get(type);
tqs.setExporter(settings);
tqs.setReader(settings, namedXContentRegistry);
}
}

/**
* Set the exporter delete after
*
* @param type {@link MetricType}
* @param deleteAfter the number of days after which Top N local indices should be deleted
*/
private void setExporterDeleteAfter(final MetricType type, final int deleteAfter) {
if (topQueriesServices.containsKey(type)) {
topQueriesServices.get(type).setExporterDeleteAfter(deleteAfter);
}
}

/**
* Get search query categorizer object
* @return SearchQueryCategorizer object
Expand All @@ -421,18 +445,32 @@ public void validateExporterAndReaderConfig(final MetricType type, final Setting
@Override
protected void doStart() {
if (isAnyFeatureEnabled()) {
scheduledFuture = threadPool.scheduleWithFixedDelay(
this::drainRecords,
QueryInsightsSettings.QUERY_RECORD_QUEUE_DRAIN_INTERVAL,
QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR
scheduledFutures = new ArrayList<>();
scheduledFutures.add(
threadPool.scheduleWithFixedDelay(
this::drainRecords,
QueryInsightsSettings.QUERY_RECORD_QUEUE_DRAIN_INTERVAL,
QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR
)
);
scheduledFutures.add(
threadPool.scheduleWithFixedDelay(
this::deleteExpiredIndices,
new TimeValue(1, TimeUnit.DAYS), // Check for deletable indices once per day
QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR
)
);
}
}

@Override
protected void doStop() {
if (scheduledFuture != null) {
scheduledFuture.cancel();
if (scheduledFutures != null) {
for (Scheduler.Cancellable cancellable : scheduledFutures) {
if (cancellable != null) {
cancellable.cancel();
}
}
}
}

Expand Down Expand Up @@ -462,4 +500,13 @@ public QueryInsightsHealthStats getHealthStats() {
topQueriesHealthStatsMap
);
}

/**
* Delete Top N local indices older than the configured data retention period
*/
private void deleteExpiredIndices() {
for (MetricType metricType : MetricType.allMetricTypes()) {
topQueriesServices.get(metricType).deleteExpiredIndices(clusterService.state().metadata().indices());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_N_QUERIES_INDEX_PATTERN;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_QUERIES_EXPORTER_TYPE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORTER_TYPE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.MAX_DELETE_AFTER_VALUE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.MIN_DELETE_AFTER_VALUE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR;

import java.io.IOException;
Expand All @@ -33,9 +35,11 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.joda.time.DateTime;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.plugin.insights.core.exporter.LocalIndexExporter;
import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporter;
import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory;
import org.opensearch.plugin.insights.core.exporter.SinkType;
Expand Down Expand Up @@ -536,4 +540,45 @@ private void drain() {
public TopQueriesHealthStats getHealthStats() {
return new TopQueriesHealthStats(this.topQueriesStore.size(), this.queryGrouper.getHealthStats());
}

/**
* Validate the exporter delete after value
*
* @param deleteAfter exporter and reader settings
*/
static void validateExporterDeleteAfter(final int deleteAfter) {
if (deleteAfter < MIN_DELETE_AFTER_VALUE || deleteAfter > MAX_DELETE_AFTER_VALUE) {
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.INVALID_EXPORTER_TYPE_FAILURES);
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"Invalid exporter delete_after_days setting [%d], value should be an integer between %d and %d.",
deleteAfter,
MIN_DELETE_AFTER_VALUE,
MAX_DELETE_AFTER_VALUE
)
);
}
}

/**
* Set exporter delete after if exporter is a {@link LocalIndexExporter}
*
* @param deleteAfter the number of days after which Top N local indices should be deleted
*/
void setExporterDeleteAfter(final int deleteAfter) {
if (exporter != null && exporter.getClass() == LocalIndexExporter.class) {
((LocalIndexExporter) exporter).setDeleteAfter(deleteAfter);
}
}

/**
* Delete Top N local indices older than the configured data retention period
*/
void deleteExpiredIndices(final Map<String, IndexMetadata> indexMetadataMap) {
if (exporter != null && exporter.getClass() == LocalIndexExporter.class) {
threadPool.executor(QUERY_INSIGHTS_EXECUTOR)
.execute(() -> ((LocalIndexExporter) exporter).deleteExpiredIndices(indexMetadataMap));
}
}
}
Loading

0 comments on commit 4299935

Please sign in to comment.