Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add rollover & archival mechanism for correlation history indices #670

Merged
merged 1 commit into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,10 @@ public List<Setting<?>> getSettings() {
SecurityAnalyticsSettings.FINDING_HISTORY_INDEX_MAX_AGE,
SecurityAnalyticsSettings.FINDING_HISTORY_ROLLOVER_PERIOD,
SecurityAnalyticsSettings.FINDING_HISTORY_RETENTION_PERIOD,
SecurityAnalyticsSettings.CORRELATION_HISTORY_MAX_DOCS,
SecurityAnalyticsSettings.CORRELATION_HISTORY_INDEX_MAX_AGE,
SecurityAnalyticsSettings.CORRELATION_HISTORY_ROLLOVER_PERIOD,
SecurityAnalyticsSettings.CORRELATION_HISTORY_RETENTION_PERIOD,
SecurityAnalyticsSettings.IS_CORRELATION_INDEX_SETTING,
SecurityAnalyticsSettings.CORRELATION_TIME_WINDOW,
SecurityAnalyticsSettings.DEFAULT_MAPPING_SCHEMA,
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ public class SecurityAnalyticsSettings {
Setting.Property.NodeScope, Setting.Property.Dynamic
);

public static final Setting<TimeValue> CORRELATION_HISTORY_ROLLOVER_PERIOD = Setting.positiveTimeSetting(
"plugins.security_analytics.correlation_history_rollover_period",
TimeValue.timeValueHours(12),
Setting.Property.NodeScope, Setting.Property.Dynamic
);

public static final Setting<TimeValue> ALERT_HISTORY_INDEX_MAX_AGE = Setting.positiveTimeSetting(
"plugins.security_analytics.alert_history_max_age",
new TimeValue(30, TimeUnit.DAYS),
Expand All @@ -55,6 +61,12 @@ public class SecurityAnalyticsSettings {
Setting.Property.NodeScope, Setting.Property.Dynamic
);

public static final Setting<TimeValue> CORRELATION_HISTORY_INDEX_MAX_AGE = Setting.positiveTimeSetting(
"plugins.security_analytics.correlation_history_max_age",
new TimeValue(30, TimeUnit.DAYS),
Setting.Property.NodeScope, Setting.Property.Dynamic
);

public static final Setting<Long> ALERT_HISTORY_MAX_DOCS = Setting.longSetting(
"plugins.security_analytics.alert_history_max_docs",
1000L,
Expand All @@ -69,6 +81,13 @@ public class SecurityAnalyticsSettings {
Setting.Property.NodeScope, Setting.Property.Dynamic, Setting.Property.Deprecated
);

public static final Setting<Long> CORRELATION_HISTORY_MAX_DOCS = Setting.longSetting(
"plugins.security_analytics.correlation_history_max_docs",
1000L,
0L,
Setting.Property.NodeScope, Setting.Property.Dynamic
);

public static final Setting<TimeValue> ALERT_HISTORY_RETENTION_PERIOD = Setting.positiveTimeSetting(
"plugins.security_analytics.alert_history_retention_period",
new TimeValue(60, TimeUnit.DAYS),
Expand All @@ -81,6 +100,12 @@ public class SecurityAnalyticsSettings {
Setting.Property.NodeScope, Setting.Property.Dynamic
);

public static final Setting<TimeValue> CORRELATION_HISTORY_RETENTION_PERIOD = Setting.positiveTimeSetting(
"plugins.security_analytics.correlation_history_retention_period",
new TimeValue(60, TimeUnit.DAYS),
Setting.Property.NodeScope, Setting.Property.Dynamic
);

public static final Setting<TimeValue> REQUEST_TIMEOUT = Setting.positiveTimeSetting(
"plugins.security_analytics.request_timeout",
TimeValue.timeValueSeconds(10),
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ public void onResponse(AcknowledgedResponse response) {
public void onFailure(Exception e) {
onFailures(e);
}
}
},
false
);
} else {
indexCorrelationRule();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ public void onResponse(AcknowledgedResponse response) {
public void onFailure(Exception e) {
onFailures(e);
}
});
}, false
);
} else {
prepareCustomLogTypeIndexing();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1041,7 +1041,8 @@ public void onResponse(AcknowledgedResponse response) {
public void onFailure(Exception e) {
onFailures(e);
}
}
},
false
);
} else {
prepareDetectorIndexing();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ public void onResponse(AcknowledgedResponse response) {
public void onFailure(Exception e) {
onFailures(e);
}
}
},
false
);
} else {
prepareRuleIndexing();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@
searchSourceBuilder.fetchSource(true);
searchSourceBuilder.size(10000);
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices(CorrelationIndices.CORRELATION_INDEX);
searchRequest.indices(CorrelationIndices.CORRELATION_HISTORY_INDEX_PATTERN_REGEXP);

Check warning on line 114 in src/main/java/org/opensearch/securityanalytics/transport/TransportListCorrelationAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/transport/TransportListCorrelationAction.java#L114

Added line #L114 was not covered by tests
searchRequest.source(searchSourceBuilder);
searchRequest.preference(Preference.PRIMARY_FIRST.type());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@
scoreSearchSourceBuilder.fetchSource(true);
scoreSearchSourceBuilder.size(1);
SearchRequest scoreSearchRequest = new SearchRequest();
scoreSearchRequest.indices(CorrelationIndices.CORRELATION_INDEX);
scoreSearchRequest.indices(CorrelationIndices.CORRELATION_METADATA_INDEX);

Check warning on line 136 in src/main/java/org/opensearch/securityanalytics/transport/TransportSearchCorrelationAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/transport/TransportSearchCorrelationAction.java#L136

Added line #L136 was not covered by tests
scoreSearchRequest.source(scoreSearchSourceBuilder);
scoreSearchRequest.preference(Preference.PRIMARY_FIRST.type());

Expand All @@ -156,7 +156,7 @@
searchSourceBuilder.fetchField("counter");
searchSourceBuilder.size(1);
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices(CorrelationIndices.CORRELATION_INDEX);
searchRequest.indices(CorrelationIndices.CORRELATION_HISTORY_INDEX_PATTERN_REGEXP);

Check warning on line 159 in src/main/java/org/opensearch/securityanalytics/transport/TransportSearchCorrelationAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/transport/TransportSearchCorrelationAction.java#L159

Added line #L159 was not covered by tests
searchRequest.source(searchSourceBuilder);
searchRequest.preference(Preference.PRIMARY_FIRST.type());

Expand All @@ -168,11 +168,11 @@

for (SearchHit hit: hits) {
long counter = hit.getFields().get("counter").<Long>getValue();
float[] query = new float[101];
for (int i = 0; i < 100; ++i) {
float[] query = new float[3];

Check warning on line 171 in src/main/java/org/opensearch/securityanalytics/transport/TransportSearchCorrelationAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/transport/TransportSearchCorrelationAction.java#L171

Added line #L171 was not covered by tests
for (int i = 0; i < 2; ++i) {
query[i] = (2.0f * ((float) counter) - 50.0f) / 2.0f;
}
query[100] = Long.valueOf((findingTimestamp - scoreTimestamp) / 1000L).floatValue();
query[2] = Long.valueOf((findingTimestamp - scoreTimestamp) / 1000L).floatValue();

Check warning on line 175 in src/main/java/org/opensearch/securityanalytics/transport/TransportSearchCorrelationAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/transport/TransportSearchCorrelationAction.java#L175

Added line #L175 was not covered by tests

CorrelationQueryBuilder correlationQueryBuilder = new CorrelationQueryBuilder("corr_vector", query, noOfNearbyFindings, QueryBuilders.boolQuery()
.mustNot(QueryBuilders.matchQuery(
Expand All @@ -188,7 +188,7 @@
searchSourceBuilder.fetchSource(true);
searchSourceBuilder.size(noOfNearbyFindings);
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices(CorrelationIndices.CORRELATION_INDEX);
searchRequest.indices(CorrelationIndices.CORRELATION_HISTORY_INDEX_PATTERN_REGEXP);

Check warning on line 191 in src/main/java/org/opensearch/securityanalytics/transport/TransportSearchCorrelationAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/transport/TransportSearchCorrelationAction.java#L191

Added line #L191 was not covered by tests
searchRequest.source(searchSourceBuilder);
searchRequest.preference(Preference.PRIMARY_FIRST.type());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.admin.indices.alias.Alias;
import org.opensearch.core.action.ActionListener;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
Expand All @@ -33,7 +34,13 @@
public class CorrelationIndices {

private static final Logger log = LogManager.getLogger(CorrelationIndices.class);
public static final String CORRELATION_INDEX = ".opensearch-sap-correlation-history";

public static final String CORRELATION_METADATA_INDEX = ".opensearch-sap-correlation-metadata";
public static final String CORRELATION_HISTORY_INDEX_PATTERN = "<.opensearch-sap-correlation-history-{now/d}-1>";

public static final String CORRELATION_HISTORY_INDEX_PATTERN_REGEXP = ".opensearch-sap-correlation-history*";

public static final String CORRELATION_HISTORY_WRITE_INDEX = ".opensearch-sap-correlation-history-write";
public static final long FIXED_HISTORICAL_INTERVAL = 24L * 60L * 60L * 20L * 1000L;

private final Client client;
Expand All @@ -51,16 +58,35 @@

public void initCorrelationIndex(ActionListener<CreateIndexResponse> actionListener) throws IOException {
if (!correlationIndexExists()) {
CreateIndexRequest indexRequest = new CreateIndexRequest(CORRELATION_INDEX)
CreateIndexRequest indexRequest = new CreateIndexRequest(CORRELATION_HISTORY_INDEX_PATTERN)
.mapping(correlationMappings())
.settings(Settings.builder().put("index.hidden", true).put("index.correlation", true).build());
indexRequest.alias(new Alias(CORRELATION_HISTORY_WRITE_INDEX));
client.admin().indices().create(indexRequest, actionListener);
} else {
actionListener.onResponse(new CreateIndexResponse(true, true, CORRELATION_HISTORY_INDEX_PATTERN));

Check warning on line 67 in src/main/java/org/opensearch/securityanalytics/util/CorrelationIndices.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/util/CorrelationIndices.java#L61-L67

Added lines #L61 - L67 were not covered by tests
}
}

Check warning on line 69 in src/main/java/org/opensearch/securityanalytics/util/CorrelationIndices.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/util/CorrelationIndices.java#L69

Added line #L69 was not covered by tests

public void initCorrelationMetadataIndex(ActionListener<CreateIndexResponse> actionListener) throws IOException {
if (!correlationMetadataIndexExists()) {
CreateIndexRequest indexRequest = new CreateIndexRequest(CORRELATION_METADATA_INDEX)

Check warning on line 73 in src/main/java/org/opensearch/securityanalytics/util/CorrelationIndices.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/util/CorrelationIndices.java#L73

Added line #L73 was not covered by tests
.mapping(correlationMappings())
.settings(Settings.builder().put("index.hidden", true).put("index.correlation", true).build());
client.admin().indices().create(indexRequest, actionListener);
} else {
actionListener.onResponse(new CreateIndexResponse(true, true, CORRELATION_METADATA_INDEX));

Check warning on line 78 in src/main/java/org/opensearch/securityanalytics/util/CorrelationIndices.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/util/CorrelationIndices.java#L77-L78

Added lines #L77 - L78 were not covered by tests
}
}

public boolean correlationIndexExists() {
ClusterState clusterState = clusterService.state();
return clusterState.getRoutingTable().hasIndex(CORRELATION_INDEX);
return clusterState.metadata().hasAlias(CORRELATION_HISTORY_WRITE_INDEX);

Check warning on line 84 in src/main/java/org/opensearch/securityanalytics/util/CorrelationIndices.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/util/CorrelationIndices.java#L84

Added line #L84 was not covered by tests
}

public boolean correlationMetadataIndexExists() {
ClusterState clusterState = clusterService.state();
return clusterState.metadata().hasIndex(CORRELATION_METADATA_INDEX);

Check warning on line 89 in src/main/java/org/opensearch/securityanalytics/util/CorrelationIndices.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/util/CorrelationIndices.java#L88-L89

Added lines #L88 - L89 were not covered by tests
}

public void setupCorrelationIndex(TimeValue indexTimeout, Long setupTimestamp, ActionListener<BulkResponse> listener) {
Expand All @@ -76,7 +102,7 @@
builder.field("scoreTimestamp", 0L);
builder.endObject();

IndexRequest indexRequest = new IndexRequest(CorrelationIndices.CORRELATION_INDEX)
IndexRequest indexRequest = new IndexRequest(CORRELATION_METADATA_INDEX)

Check warning on line 105 in src/main/java/org/opensearch/securityanalytics/util/CorrelationIndices.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/util/CorrelationIndices.java#L105

Added line #L105 was not covered by tests
.source(builder)
.timeout(indexTimeout);

Expand All @@ -85,7 +111,7 @@
scoreBuilder.field("root", false);
scoreBuilder.endObject();

IndexRequest scoreIndexRequest = new IndexRequest(CorrelationIndices.CORRELATION_INDEX)
IndexRequest scoreIndexRequest = new IndexRequest(CORRELATION_METADATA_INDEX)

Check warning on line 114 in src/main/java/org/opensearch/securityanalytics/util/CorrelationIndices.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/util/CorrelationIndices.java#L114

Added line #L114 was not covered by tests
.source(scoreBuilder)
.timeout(indexTimeout);

Expand All @@ -100,16 +126,4 @@
log.error(ex);
}
}

public ClusterIndexHealth correlationIndexHealth() {
ClusterIndexHealth indexHealth = null;

if (correlationIndexExists()) {
IndexRoutingTable indexRoutingTable = clusterService.state().routingTable().index(CORRELATION_INDEX);
IndexMetadata indexMetadata = clusterService.state().metadata().index(CORRELATION_INDEX);

indexHealth = new ClusterIndexHealth(indexMetadata, indexRoutingTable);
}
return indexHealth;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
*/
package org.opensearch.securityanalytics.util;

import java.util.Optional;
import java.util.SortedMap;

import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.core.action.ActionListener;
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.opensearch.action.support.IndicesOptions;
Expand All @@ -24,6 +27,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.function.Predicate;

public class IndexUtils {

Expand All @@ -35,6 +39,10 @@
public static Boolean customRuleIndexUpdated = false;
public static Boolean prePackagedRuleIndexUpdated = false;
public static Boolean correlationIndexUpdated = false;

public static Boolean correlationMetadataIndexUpdated = false;

public static String lastUpdatedCorrelationHistoryIndex = null;
public static Boolean correlationRuleIndexUpdated = false;

public static Boolean customLogTypeIndexUpdated = false;
Expand All @@ -53,6 +61,10 @@

public static void correlationIndexUpdated() { correlationIndexUpdated = true; }

public static void correlationMetadataIndexUpdated() {
correlationMetadataIndexUpdated = true;
}

Check warning on line 66 in src/main/java/org/opensearch/securityanalytics/util/IndexUtils.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/util/IndexUtils.java#L65-L66

Added lines #L65 - L66 were not covered by tests

public static void correlationRuleIndexUpdated() {
correlationRuleIndexUpdated = true;
}
Expand Down Expand Up @@ -112,11 +124,20 @@
String mapping,
ClusterState clusterState,
IndicesAdminClient client,
ActionListener<AcknowledgedResponse> actionListener
ActionListener<AcknowledgedResponse> actionListener,
boolean alias
) throws IOException {
if (clusterState.metadata().indices().containsKey(index)) {
if (shouldUpdateIndex(clusterState.metadata().index(index), mapping)) {
PutMappingRequest putMappingRequest = new PutMappingRequest(index).source(mapping, XContentType.JSON);
String targetIndex = index;

Check warning on line 130 in src/main/java/org/opensearch/securityanalytics/util/IndexUtils.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/util/IndexUtils.java#L130

Added line #L130 was not covered by tests
if (alias) {
targetIndex = IndexUtils.getIndexNameWithAlias(clusterState, index);

Check warning on line 132 in src/main/java/org/opensearch/securityanalytics/util/IndexUtils.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/util/IndexUtils.java#L132

Added line #L132 was not covered by tests
}
if (targetIndex.equals(IndexUtils.lastUpdatedCorrelationHistoryIndex)) {
return;

Check warning on line 135 in src/main/java/org/opensearch/securityanalytics/util/IndexUtils.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/util/IndexUtils.java#L135

Added line #L135 was not covered by tests
}

if (clusterState.metadata().indices().containsKey(targetIndex)) {
if (shouldUpdateIndex(clusterState.metadata().index(targetIndex), mapping)) {
PutMappingRequest putMappingRequest = new PutMappingRequest(targetIndex).source(mapping, XContentType.JSON);

Check warning on line 140 in src/main/java/org/opensearch/securityanalytics/util/IndexUtils.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/util/IndexUtils.java#L140

Added line #L140 was not covered by tests
client.putMapping(putMappingRequest, actionListener);
} else {
actionListener.onResponse(new AcknowledgedResponse(true));
Expand Down Expand Up @@ -176,4 +197,11 @@
return getNewestIndexByCreationDate(strings, state);
}

public static String getIndexNameWithAlias(ClusterState clusterState, String alias) {
Optional<Map.Entry<String, IndexMetadata>> entry = clusterState.metadata().indices().entrySet().stream().filter(
stringIndexMetadataEntry -> stringIndexMetadataEntry.getValue().getAliases().containsKey(alias)
).findFirst();
return entry.map(Map.Entry::getKey).orElse(null);

Check warning on line 204 in src/main/java/org/opensearch/securityanalytics/util/IndexUtils.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/util/IndexUtils.java#L201-L204

Added lines #L201 - L204 were not covered by tests
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ public void initPrepackagedRulesIndex(ActionListener<CreateIndexResponse> create
IndexUtils.updateIndexMapping(
Rule.PRE_PACKAGED_RULES_INDEX,
RuleIndices.ruleMappings(), clusterService.state(), client.admin().indices(),
updateListener
updateListener,
false
);
} else {
countRules(searchListener);
Expand Down
4 changes: 2 additions & 2 deletions src/main/resources/mappings/correlation.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"_meta" : {
"schema_version": 1
"schema_version": 2
},
"properties": {
"root": {
Expand All @@ -17,7 +17,7 @@
},
"corr_vector": {
"type": "sa_vector",
"dimension": 101,
"dimension": 3,
"correlation_ctx": {
"similarityFunction": "EUCLIDEAN",
"parameters": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.opensearch.securityanalytics.model.Detector;
import org.opensearch.securityanalytics.model.Rule;
import org.opensearch.securityanalytics.model.ThreatIntelFeedData;
import org.opensearch.securityanalytics.util.CorrelationIndices;
import org.opensearch.test.rest.OpenSearchRestTestCase;


Expand Down Expand Up @@ -1488,6 +1489,24 @@ public List<String> getFindingIndices(String detectorType) throws IOException {
return indices;
}

public List<String> getCorrelationHistoryIndices() throws IOException {
Response response = client().performRequest(new Request("GET", "/_cat/indices/" + CorrelationIndices.CORRELATION_HISTORY_INDEX_PATTERN_REGEXP + "?format=json"));
XContentParser xcp = createParser(XContentType.JSON.xContent(), response.getEntity().getContent());
List<Object> responseList = xcp.list();
List<String> indices = new ArrayList<>();
for (Object o : responseList) {
if (o instanceof Map) {
((Map<?, ?>) o).forEach((BiConsumer<Object, Object>)
(o1, o2) -> {
if (o1.equals("index")) {
indices.add((String) o2);
}
});
}
}
return indices;
}

public void updateClusterSetting(String setting, String value) throws IOException {
String settingJson = "{\n" +
" \"persistent\" : {" +
Expand Down
Loading
Loading