Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changes threat intel default store config model #1133

Merged
merged 11 commits into from
Jul 9, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ public List<Setting<?>> getSettings() {
SecurityAnalyticsSettings.BATCH_SIZE,
SecurityAnalyticsSettings.THREAT_INTEL_TIMEOUT,
SecurityAnalyticsSettings.IOC_INDEX_RETENTION_PERIOD,
SecurityAnalyticsSettings.IOC_MAX_INDICES_PER_ALIAS
SecurityAnalyticsSettings.IOC_MAX_INDICES_PER_INDEX_PATTERN
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,11 @@
import org.opensearch.securityanalytics.action.ListIOCsAction;
import org.opensearch.securityanalytics.action.ListIOCsActionRequest;
import org.opensearch.securityanalytics.action.ListIOCsActionResponse;
import org.opensearch.securityanalytics.commons.model.STIX2;
import org.opensearch.securityanalytics.model.STIX2IOC;

import java.io.IOException;
import java.time.DateTimeException;
import java.time.Instant;
import java.util.List;
import java.util.Locale;

import static org.opensearch.securityanalytics.services.STIX2IOCFeedStore.getIocIndexAlias;

public class RestListIOCsAction extends BaseRestHandler {
private static final Logger log = LogManager.getLogger(RestListIOCsAction.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,33 +10,29 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchException;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.admin.indices.alias.Alias;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.admin.indices.rollover.RolloverRequest;
import org.opensearch.action.admin.indices.rollover.RolloverResponse;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.support.GroupedActionListener;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.Streams;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.securityanalytics.commons.model.IOC;
import org.opensearch.securityanalytics.commons.model.IOCType;
import org.opensearch.securityanalytics.commons.model.UpdateAction;
import org.opensearch.securityanalytics.commons.store.FeedStore;
import org.opensearch.securityanalytics.model.STIX2IOC;
import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings;
import org.opensearch.securityanalytics.threatIntel.common.StashedThreadContext;
import org.opensearch.securityanalytics.threatIntel.model.DefaultIocStoreConfig;
import org.opensearch.securityanalytics.threatIntel.model.SATIFSourceConfig;
import org.opensearch.securityanalytics.util.IndexUtils;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand All @@ -56,7 +52,7 @@ public class STIX2IOCFeedStore implements FeedStore {
public static final String IOC_ALL_INDEX_PATTERN = IOC_INDEX_NAME_BASE + "-*";
public static final String IOC_FEED_ID_PLACEHOLDER = "FEED_ID";
public static final String IOC_INDEX_NAME_TEMPLATE = IOC_INDEX_NAME_BASE + "-" + IOC_FEED_ID_PLACEHOLDER;
public static final String IOC_WRITE_INDEX_ALIAS = IOC_INDEX_NAME_TEMPLATE;
public static final String IOC_ALL_INDEX_PATTERN_BY_ID = IOC_INDEX_NAME_TEMPLATE + "-*";
public static final String IOC_TIME_PLACEHOLDER = "TIME";
public static final String IOC_INDEX_PATTERN = IOC_INDEX_NAME_TEMPLATE + "-" + IOC_TIME_PLACEHOLDER;

Expand Down Expand Up @@ -117,80 +113,36 @@ public void storeIOCs(Map<IOC, UpdateAction> actionToIOCs) {
}

public void indexIocs(List<STIX2IOC> iocs) throws IOException {
String iocAlias = getIocIndexAlias(saTifSourceConfig.getId());
String iocPattern = getIocIndexRolloverPattern(saTifSourceConfig.getId());
String newActiveIndex = getNewActiveIndex(saTifSourceConfig.getId());
String iocIndexPattern = getAllIocIndexPatternById(saTifSourceConfig.getId());

if (iocIndexExists(iocAlias) == false) {
initFeedIndex(iocAlias, iocPattern, ActionListener.wrap(
r -> {
saTifSourceConfig.getIocTypes().forEach(type -> {
String writeIndex = IndexUtils.getWriteIndex(iocAlias, clusterService.state());
String lowerCaseType = type.toLowerCase(Locale.ROOT);
((DefaultIocStoreConfig) saTifSourceConfig.getIocStoreConfig()).getIocMapStore().putIfAbsent(lowerCaseType, new ArrayList<>());
((DefaultIocStoreConfig) saTifSourceConfig.getIocStoreConfig()).getIocMapStore().get(lowerCaseType).add(iocAlias);
((DefaultIocStoreConfig) saTifSourceConfig.getIocStoreConfig()).getIocMapStore().get(lowerCaseType).add(writeIndex);
});
bulkIndexIocs(iocs, iocAlias);
}, e-> {
log.error("Failed to initialize the IOC index and save the IOCs", e);
baseListener.onFailure(e);
}
));
} else {
rolloverIndex(iocAlias, iocPattern, ActionListener.wrap(
r -> {
saTifSourceConfig.getIocTypes().forEach(type -> {
String writeIndex = IndexUtils.getWriteIndex(iocAlias, clusterService.state());
String lowerCaseType = type.toLowerCase(Locale.ROOT);
((DefaultIocStoreConfig) saTifSourceConfig.getIocStoreConfig()).getIocMapStore().get(lowerCaseType).add(writeIndex);
});
bulkIndexIocs(iocs, iocAlias);
}, e -> {
log.error("Failed to rollover the IOC index and save the IOCs", e);
baseListener.onFailure(e);
}
));
}
}

private void rolloverIndex(
String alias,
String pattern,
ActionListener<RolloverResponse> listener
) {
if (clusterService.state().metadata().hasAlias(alias) == false) {
listener.onFailure(new OpenSearchException("Alias not initialized"));
return;
}

RolloverRequest request = new RolloverRequest(alias, pattern);
request.getCreateIndexRequest()
.mapping(iocIndexMapping())
.settings(Settings.builder().put("index.hidden", true).build());
client.admin().indices().rolloverIndex(
request,
ActionListener.wrap(
rolloverResponse -> {
if (false == rolloverResponse.isRolledOver()) {
log.info(alias + "not rolled over. Rollover condition status: " + rolloverResponse.getConditionStatus());
listener.onFailure(new OpenSearchException(alias + "not rolled over. Rollover condition status: " + rolloverResponse.getConditionStatus()));
} else {
listener.onResponse(rolloverResponse);
}
}, e -> {
log.error("rollover failed for alias [" + alias + "].");
listener.onFailure(e);
initFeedIndex(newActiveIndex, ActionListener.wrap(
r -> {
saTifSourceConfig.getIocTypes().forEach(type -> {
IOCType iocType = IOCType.fromString(type);
if (saTifSourceConfig.getIocStoreConfig() instanceof DefaultIocStoreConfig) {
List<DefaultIocStoreConfig.IocToIndexDetails> listOfIocToIndexDetails =
((DefaultIocStoreConfig) saTifSourceConfig.getIocStoreConfig()).getIocToIndexDetails();
listOfIocToIndexDetails.removeIf(iocToIndexDetails -> iocToIndexDetails.getIocType() == iocType);
DefaultIocStoreConfig.IocToIndexDetails iocToIndexDetails =
new DefaultIocStoreConfig.IocToIndexDetails(iocType, iocIndexPattern, newActiveIndex);
listOfIocToIndexDetails.add(iocToIndexDetails);
}
)
);
});
bulkIndexIocs(iocs, newActiveIndex);
}, e-> {
log.error("Failed to initialize the IOC index and save the IOCs", e);
baseListener.onFailure(e);
}
));
}

private void bulkIndexIocs(List<STIX2IOC> iocs, String iocAlias) throws IOException {
private void bulkIndexIocs(List<STIX2IOC> iocs, String activeIndex) throws IOException {
List<BulkRequest> bulkRequestList = new ArrayList<>();
BulkRequest bulkRequest = new BulkRequest();

for (STIX2IOC ioc : iocs) {
IndexRequest indexRequest = new IndexRequest(iocAlias)
IndexRequest indexRequest = new IndexRequest(activeIndex)
.id(StringUtils.isBlank(ioc.getId())? UUID.randomUUID().toString() : ioc.getId())
.opType(DocWriteRequest.OpType.INDEX)
.source(ioc.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS));
Expand Down Expand Up @@ -235,27 +187,20 @@ private void bulkIndexIocs(List<STIX2IOC> iocs, String iocAlias) throws IOExcept
}
}

public boolean iocIndexExists(String alias) {
ClusterState clusterState = clusterService.state();
return clusterState.metadata().hasAlias(alias);
public static String getAllIocIndexPatternById(String sourceConfigId) {
return IOC_ALL_INDEX_PATTERN_BY_ID.replace(IOC_FEED_ID_PLACEHOLDER, sourceConfigId.toLowerCase(Locale.ROOT));
}

public static String getIocIndexAlias(String feedSourceConfigId) {
return IOC_WRITE_INDEX_ALIAS.replace(IOC_FEED_ID_PLACEHOLDER, feedSourceConfigId.toLowerCase(Locale.ROOT));
}

public static String getIocIndexRolloverPattern(String feedSourceConfigId) {
public static String getNewActiveIndex(String sourceConfigId) {
return IOC_INDEX_PATTERN
.replace(IOC_FEED_ID_PLACEHOLDER, feedSourceConfigId.toLowerCase(Locale.ROOT))
.replace(IOC_FEED_ID_PLACEHOLDER, sourceConfigId.toLowerCase(Locale.ROOT))
.replace(IOC_TIME_PLACEHOLDER, Long.toString(Instant.now().toEpochMilli()));
}


public void initFeedIndex(String feedAliasName, String feedIndexName, ActionListener<CreateIndexResponse> listener) {
public void initFeedIndex(String feedIndexName, ActionListener<CreateIndexResponse> listener) {
var indexRequest = new CreateIndexRequest(feedIndexName)
.mapping(iocIndexMapping())
.settings(Settings.builder().put("index.hidden", true).build());
indexRequest.alias(new Alias(feedAliasName)); // set the alias
client.admin().indices().create(indexRequest, ActionListener.wrap(
r -> {
log.info("Created system index {}", feedIndexName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public static final List<Setting<?>> settings() {
Setting.Property.NodeScope, Setting.Property.Dynamic
);

public static final Setting<Integer> IOC_MAX_INDICES_PER_ALIAS = Setting.intSetting(
public static final Setting<Integer> IOC_MAX_INDICES_PER_INDEX_PATTERN = Setting.intSetting(
"plugins.security_analytics.ioc.max_indices_per_alias",
30,
1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,36 @@

package org.opensearch.securityanalytics.threatIntel.common;

import org.opensearch.securityanalytics.commons.model.IOCType;
import org.opensearch.securityanalytics.threatIntel.model.IocUploadSource;
import org.opensearch.securityanalytics.threatIntel.model.S3Source;
import org.opensearch.securityanalytics.threatIntel.model.SATIFSourceConfigDto;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

/**
* Source config dto validator
*/
public class SourceConfigDtoValidator {
public List<String> validateSourceConfigDto(SATIFSourceConfigDto sourceConfigDto) {

List<String> errorMsgs = new ArrayList<>();
List<String> iocTypeEnumNames = Arrays.stream(IOCType.values())
.map(Enum::name)
.collect(Collectors.toList());

if (sourceConfigDto.getIocTypes().isEmpty()) {
errorMsgs.add("Must specify at least one IOC type");
} else {
for (String s: sourceConfigDto.getIocTypes()) {
if (false == iocTypeEnumNames.contains(s)) {
errorMsgs.add("Invalid IOC type: " + s);
}
}
}

switch (sourceConfigDto.getType()) {
case IOC_UPLOAD:
if (sourceConfigDto.isEnabled()) {
Expand Down
Loading
Loading