Skip to content

Commit

Permalink
Enabled multiple detector type support
Browse files Browse the repository at this point in the history
GetFinding supporting multiple detector types

GetAlerts supporting multiple detector types

Added multiple detector types in request model

Added grouped listener for acking the alerts

Introduced breaking change - removed detector type from a detector. Added collection of detector types in input field

Refactored tests and updated constructor from a Detector not to consider detector type property

Signed-off-by: Stevan Buzejic <[email protected]>
  • Loading branch information
stevanbuzejic authored and stevanbz committed Jan 9, 2023
1 parent 1770873 commit d8997c4
Show file tree
Hide file tree
Showing 27 changed files with 2,031 additions and 475 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
.field(_VERSION, version);
builder.startObject("detector")
.field(Detector.NAME_FIELD, detector.getName())
.field(Detector.DETECTOR_TYPE_FIELD, detector.getDetectorType())
.field(Detector.ENABLED_FIELD, detector.getEnabled())
.field(Detector.SCHEDULE_FIELD, detector.getSchedule())
.field(Detector.INPUTS_FIELD, detector.getInputs())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
.field(_VERSION, version);
builder.startObject("detector")
.field(Detector.NAME_FIELD, detector.getName())
.field(Detector.DETECTOR_TYPE_FIELD, detector.getDetectorType())
.field(Detector.ENABLED_FIELD, detector.getEnabled())
.field(Detector.SCHEDULE_FIELD, detector.getSchedule())
.field(Detector.INPUTS_FIELD, detector.getInputs())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,28 +78,46 @@ public void onResponse(GetDetectorResponse getDetectorResponse) {
detector.getMonitorIds().forEach(
monitorId -> monitorToDetectorMapping.put(monitorId, detector.getId())
);
// Get alerts for all monitor ids
AlertsService.this.getAlertsByMonitorIds(

List<String> detectorTypes = detector.getDetectorTypes();

GroupedActionListener<GetAlertsResponse> getAlertsResponseListener = new GroupedActionListener(
new ActionListener<Collection<GetAlertsResponse>>() {
@Override
public void onResponse(Collection<GetAlertsResponse> alertsResponses) {
List<AlertDto> alerts = new ArrayList<>();
// Merge all findings into one response
int totalAlerts = alertsResponses.stream().map(GetAlertsResponse::getTotalAlerts).collect(
Collectors.summingInt(Integer::intValue));
alerts.addAll(alertsResponses.stream().flatMap(getAlertsResponse -> getAlertsResponse.getAlerts().stream()).collect(
Collectors.toList()));

GetAlertsResponse masterResponse = new GetAlertsResponse(
alerts,
totalAlerts
);
listener.onResponse(masterResponse);
}
@Override
public void onFailure(Exception e) {
log.error("Failed to fetch alerts for detectorId: " + detectorId, e);
listener.onFailure(SecurityAnalyticsException.wrap(e));
}
}, detectorTypes.size());

for (String detectorType: detectorTypes) {
// Get alerts for all monitor ids
AlertsService.this.getAlertsByMonitorIds(
monitorToDetectorMapping,
// TODO - Monitor list will contain all the monitors event those from another detector type
monitorIds,
DetectorMonitorConfig.getAllAlertsIndicesPattern(detector.getDetectorType()),
DetectorMonitorConfig.getAllAlertsIndicesPattern(detectorType),
table,
severityLevel,
alertState,
new ActionListener<>() {
@Override
public void onResponse(GetAlertsResponse getAlertsResponse) {
// Send response back
listener.onResponse(getAlertsResponse);
}

@Override
public void onFailure(Exception e) {
log.error("Failed to fetch alerts for detectorId: " + detectorId, e);
listener.onFailure(SecurityAnalyticsException.wrap(e));
}
}
);
getAlertsResponseListener
);
}
}

@Override
Expand Down Expand Up @@ -240,18 +258,46 @@ public void getAlerts(List<String> alertIds,
Detector detector,
Table table,
ActionListener<org.opensearch.commons.alerting.action.GetAlertsResponse> actionListener) {
GetAlertsRequest request = new GetAlertsRequest(

List<String> detectorTypes = detector.getDetectorTypes();

ActionListener<org.opensearch.commons.alerting.action.GetAlertsResponse> getAlertsResponseListener = new GroupedActionListener(
new ActionListener<Collection<org.opensearch.commons.alerting.action.GetAlertsResponse>>() {
@Override
public void onResponse(Collection<org.opensearch.commons.alerting.action.GetAlertsResponse> alertsResponses) {
List<Alert> alerts = new ArrayList<>();
// Merge all findings into one response
int totalAlerts = alertsResponses.stream().map(org.opensearch.commons.alerting.action.GetAlertsResponse::getTotalAlerts).collect(
Collectors.summingInt(Integer::intValue));
alerts.addAll(alertsResponses.stream().flatMap(getAlertsResponse -> getAlertsResponse.getAlerts().stream()).collect(
Collectors.toList()));

org.opensearch.commons.alerting.action.GetAlertsResponse masterResponse = new org.opensearch.commons.alerting.action.GetAlertsResponse(
alerts,
totalAlerts
);
actionListener.onResponse(masterResponse);
}
@Override
public void onFailure(Exception e) {
log.error("Failed to fetch alerts for detectorId: " + detector.getId(), e);
actionListener.onFailure(SecurityAnalyticsException.wrap(e));
}
}, detectorTypes.size());

for(String detectorType: detectorTypes) {
GetAlertsRequest request = new GetAlertsRequest(
table,
"ALL",
"ALL",
null,
DetectorMonitorConfig.getAllAlertsIndicesPattern(detector.getDetectorType()),
DetectorMonitorConfig.getAllAlertsIndicesPattern(detectorType),
null,
alertIds);
AlertingPluginInterface.INSTANCE.getAlerts(
AlertingPluginInterface.INSTANCE.getAlerts(
(NodeClient) client,
request, actionListener);

request, getAlertsResponseListener);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package org.opensearch.securityanalytics.config.monitors;

import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.opensearch.securityanalytics.model.Detector;

Expand All @@ -15,7 +17,7 @@


public class DetectorMonitorConfig {

private static Pattern findingIndexRegexPattern = Pattern.compile(".opensearch-sap-(.*?)-findings");
public static final String OPENSEARCH_DEFAULT_RULE_INDEX = ".opensearch-sap-detectors-queries-default";
public static final String OPENSEARCH_DEFAULT_ALERT_INDEX = ".opensearch-sap-alerts-default";
public static final String OPENSEARCH_DEFAULT_ALL_ALERT_INDICES_PATTERN = ".opensearch-sap-alerts-default*";
Expand Down Expand Up @@ -119,14 +121,20 @@ public static String getFindingsIndexPattern(String detectorType) {
OPENSEARCH_DEFAULT_FINDINGS_INDEX_PATTERN;
}

public static Map<String, Map<String, String>> getRuleIndexMappingsByType(String detectorType) {
public static Map<String, Map<String, String>> getRuleIndexMappingsByType() {
HashMap<String, String> properties = new HashMap<>();
properties.put("analyzer", "rule_analyzer");
HashMap<String, Map<String, String>> fieldMappingProperties = new HashMap<>();
fieldMappingProperties.put("text", properties);
return fieldMappingProperties;
}

public static String getRuleCategoryFromFindingIndexName(String findingIndex) {
Matcher matcher = findingIndexRegexPattern.matcher(findingIndex);
matcher.find();
return matcher.group(1);
}

public static class MonitorConfig {
private final String alertsIndex;
private final String alertsHistoryIndex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package org.opensearch.securityanalytics.findings;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -14,6 +15,7 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.support.GroupedActionListener;
import org.opensearch.client.Client;
import org.opensearch.client.node.NodeClient;
import org.opensearch.commons.alerting.AlertingPluginInterface;
Expand Down Expand Up @@ -52,51 +54,58 @@ public FindingsService(Client client) {
* @param table group of search related parameters
* @param listener ActionListener to get notified on response or error
*/
public void getFindingsByDetectorId(String detectorId, Table table, ActionListener<GetFindingsResponse> listener ) {
public void getFindingsByDetectorId(String detectorId, Table table, ActionListener<GetFindingsResponse> listener) {
this.client.execute(GetDetectorAction.INSTANCE, new GetDetectorRequest(detectorId, -3L), new ActionListener<>() {

@Override
public void onResponse(GetDetectorResponse getDetectorResponse) {
// Get all monitor ids from detector
Detector detector = getDetectorResponse.getDetector();
List<String> monitorIds = detector.getMonitorIds();
ActionListener<GetFindingsResponse> getFindingsResponseListener = new ActionListener<>() {
@Override
public void onResponse(GetFindingsResponse resp) {
Integer totalFindings = 0;
List<FindingDto> findings = new ArrayList<>();
// Merge all findings into one response
totalFindings += resp.getTotalFindings();
findings.addAll(resp.getFindings());

GetFindingsResponse masterResponse = new GetFindingsResponse(
totalFindings,
findings
);
// Send master response back
listener.onResponse(masterResponse);
}

@Override
public void onFailure(Exception e) {
log.error("Failed to fetch findings for detector " + detectorId, e);
listener.onFailure(SecurityAnalyticsException.wrap(e));
}
};

// monitor --> detectorId mapping
Map<String, Detector> monitorToDetectorMapping = new HashMap<>();
detector.getMonitorIds().forEach(
monitorId -> monitorToDetectorMapping.put(monitorId, detector)
);

List<String> detectorTypes = detector.getDetectorTypes();

ActionListener<GetFindingsResponse> getFindingsResponseListener = new GroupedActionListener(
new ActionListener<Collection<GetFindingsResponse>>() {
@Override
public void onResponse(Collection<GetFindingsResponse> findingsResponses) {
List<FindingDto> findings = new ArrayList<>();
// Merge all findings into one response
int totalFindings = findingsResponses.stream().map(GetFindingsResponse::getTotalFindings).collect(
Collectors.summingInt(Integer::intValue));
findings.addAll(findingsResponses.stream().flatMap(getFindingsResponse -> getFindingsResponse.getFindings().stream()).collect(
Collectors.toList()));

GetFindingsResponse masterResponse = new GetFindingsResponse(
totalFindings,
findings
);
listener.onResponse(masterResponse);
}
@Override
public void onFailure(Exception e) {
log.error("Failed to fetch findings for detector " + detectorId, e);
listener.onFailure(SecurityAnalyticsException.wrap(e));
}
}, detectorTypes.size());

// Get findings for all monitor ids
FindingsService.this.getFindingsByMonitorIds(
for (String detectorType: detectorTypes) {
FindingsService.this.getFindingsByMonitorIds(
monitorToDetectorMapping,
monitorIds,
DetectorMonitorConfig.getAllFindingsIndicesPattern(detector.getDetectorType()),
DetectorMonitorConfig.getAllFindingsIndicesPattern(detectorType),
table,
getFindingsResponseListener
);
);
}

}

@Override
Expand Down Expand Up @@ -135,6 +144,7 @@ public void getFindingsByMonitorIds(
public void onResponse(
org.opensearch.commons.alerting.action.GetFindingsResponse getFindingsResponse
) {
log.error("alerts response size from alerting" + getFindingsResponse.getTotalFindings());
// Convert response to SA's GetFindingsResponse
listener.onResponse(new GetFindingsResponse(
getFindingsResponse.getTotalFindings(),
Expand Down Expand Up @@ -206,7 +216,7 @@ public void onFailure(Exception e) {
public FindingDto mapFindingWithDocsToFindingDto(FindingWithDocs findingWithDocs, Detector detector) {
List<DocLevelQuery> docLevelQueries = findingWithDocs.getFinding().getDocLevelQueries();
if (docLevelQueries.isEmpty()) { // this is finding generated by a bucket level monitor
for (Map.Entry<String, String> entry : detector.getRuleIdMonitorIdMap().entrySet()) {
for (Map.Entry<String, String> entry : detector.getBucketRuleIdMonitorIdMap().entrySet()) {
if(entry.getValue().equals(findingWithDocs.getFinding().getMonitorId())) {
docLevelQueries = Collections.singletonList(new DocLevelQuery(entry.getKey(),"","",Collections.emptyList()));
}
Expand Down
Loading

0 comments on commit d8997c4

Please sign in to comment.