Skip to content

Commit

Permalink
Using alerting workflows in detectors
Browse files Browse the repository at this point in the history
Signed-off-by: Subhobrata Dey <[email protected]>
  • Loading branch information
eirsep authored and sbcd90 committed Sep 6, 2023
1 parent bf2b219 commit a82fbc9
Show file tree
Hide file tree
Showing 16 changed files with 1,472 additions and 302 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,8 @@ public List<Setting<?>> getSettings() {
SecurityAnalyticsSettings.FINDING_HISTORY_RETENTION_PERIOD,
SecurityAnalyticsSettings.IS_CORRELATION_INDEX_SETTING,
SecurityAnalyticsSettings.CORRELATION_TIME_WINDOW,
SecurityAnalyticsSettings.DEFAULT_MAPPING_SCHEMA
SecurityAnalyticsSettings.DEFAULT_MAPPING_SCHEMA,
SecurityAnalyticsSettings.ENABLE_WORKFLOW_USAGE
);
}

Expand Down
47 changes: 44 additions & 3 deletions src/main/java/org/opensearch/securityanalytics/model/Detector.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public class Detector implements Writeable, ToXContentObject {
public static final String ENABLED_TIME_FIELD = "enabled_time";
public static final String ALERTING_MONITOR_ID = "monitor_id";

public static final String ALERTING_WORKFLOW_ID = "workflow_ids";

public static final String BUCKET_MONITOR_ID_RULE_ID = "bucket_monitor_id_rule_id";
private static final String RULE_TOPIC_INDEX = "rule_topic_index";

Expand Down Expand Up @@ -99,6 +101,8 @@ public class Detector implements Writeable, ToXContentObject {

private Map<String, String> ruleIdMonitorIdMap;

private List<String> workflowIds;

private String ruleIndex;

private String alertsIndex;
Expand All @@ -117,7 +121,7 @@ public Detector(String id, Long version, String name, Boolean enabled, Schedule
Instant lastUpdateTime, Instant enabledTime, String logType,
User user, List<DetectorInput> inputs, List<DetectorTrigger> triggers, List<String> monitorIds,
String ruleIndex, String alertsIndex, String alertsHistoryIndex, String alertsHistoryIndexPattern,
String findingsIndex, String findingsIndexPattern, Map<String, String> rulePerMonitor) {
String findingsIndex, String findingsIndexPattern, Map<String, String> rulePerMonitor, List<String> workflowIds) {
this.type = DETECTOR_TYPE;

this.id = id != null ? id : NO_ID;
Expand All @@ -139,6 +143,7 @@ public Detector(String id, Long version, String name, Boolean enabled, Schedule
this.findingsIndexPattern = findingsIndexPattern;
this.ruleIdMonitorIdMap = rulePerMonitor;
this.logType = logType;
this.workflowIds = workflowIds != null ? workflowIds : null;

if (enabled) {
Objects.requireNonNull(enabledTime);
Expand All @@ -165,7 +170,8 @@ public Detector(StreamInput sin) throws IOException {
sin.readString(),
sin.readString(),
sin.readString(),
sin.readMap(StreamInput::readString, StreamInput::readString)
sin.readMap(StreamInput::readString, StreamInput::readString),
sin.readStringList()
);
}

Expand Down Expand Up @@ -200,6 +206,10 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(ruleIndex);

out.writeMap(ruleIdMonitorIdMap, StreamOutput::writeString, StreamOutput::writeString);

if (workflowIds != null) {
out.writeStringCollection(workflowIds);
}
}

public XContentBuilder toXContentWithUser(XContentBuilder builder, Params params) throws IOException {
Expand Down Expand Up @@ -253,6 +263,14 @@ private XContentBuilder createXContentBuilder(XContentBuilder builder, ToXConten
}

builder.field(ALERTING_MONITOR_ID, monitorIds);

if (workflowIds == null) {
builder.nullField(ALERTING_WORKFLOW_ID);
} else {
builder.field(ALERTING_WORKFLOW_ID, workflowIds);
}


builder.field(BUCKET_MONITOR_ID_RULE_ID, ruleIdMonitorIdMap);
builder.field(RULE_TOPIC_INDEX, ruleIndex);
builder.field(ALERTS_INDEX, alertsIndex);
Expand Down Expand Up @@ -299,6 +317,7 @@ public static Detector parse(XContentParser xcp, String id, Long version) throws
List<DetectorInput> inputs = new ArrayList<>();
List<DetectorTrigger> triggers = new ArrayList<>();
List<String> monitorIds = new ArrayList<>();
List<String> workflowIds = new ArrayList<>();
Map<String, String> rulePerMonitor = new HashMap<>();

String ruleIndex = null;
Expand Down Expand Up @@ -374,6 +393,15 @@ public static Detector parse(XContentParser xcp, String id, Long version) throws
monitorIds.add(monitorId);
}
break;
case ALERTING_WORKFLOW_ID:
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp);
while (xcp.nextToken() != XContentParser.Token.END_ARRAY) {
String workflowId = xcp.textOrNull();
if (workflowId != null) {
workflowIds.add(workflowId);
}
}
break;
case BUCKET_MONITOR_ID_RULE_ID:
rulePerMonitor= xcp.mapStrings();
break;
Expand Down Expand Up @@ -429,7 +457,8 @@ public static Detector parse(XContentParser xcp, String id, Long version) throws
alertsHistoryIndexPattern,
findingsIndex,
findingsIndexPattern,
rulePerMonitor
rulePerMonitor,
workflowIds
);
}

Expand Down Expand Up @@ -566,10 +595,22 @@ public void setRuleIdMonitorIdMap(Map<String, String> ruleIdMonitorIdMap) {
this.ruleIdMonitorIdMap = ruleIdMonitorIdMap;
}

public void setWorkflowIds(List<String> workflowIds) {
this.workflowIds = workflowIds;
}

public List<String> getWorkflowIds() {
return workflowIds;
}

public String getDocLevelMonitorId() {
return ruleIdMonitorIdMap.get(DOC_LEVEL_MONITOR);
}

public boolean isWorkflowSupported() {
return workflowIds != null && !workflowIds.isEmpty();
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ public class SecurityAnalyticsSettings {
Setting.Property.NodeScope, Setting.Property.Dynamic
);

public static final Setting<Boolean> ENABLE_WORKFLOW_USAGE = Setting.boolSetting(
"plugins.security_analytics.enable_workflow_usage",
false,
Setting.Property.NodeScope, Setting.Property.Dynamic
);

public static final Setting<Boolean> IS_CORRELATION_INDEX_SETTING = Setting.boolSetting(CORRELATION_INDEX, false, Setting.Property.IndexScope);

public static final Setting<TimeValue> CORRELATION_TIME_WINDOW = Setting.positiveTimeSetting(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,12 @@
*/
package org.opensearch.securityanalytics.transport;

import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.SetOnce;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.core.action.ActionListener;
import org.opensearch.action.ActionRunnable;
import org.opensearch.action.StepListener;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.get.GetRequest;
Expand All @@ -25,28 +20,41 @@
import org.opensearch.action.support.WriteRequest;
import org.opensearch.client.Client;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.commons.alerting.AlertingPluginInterface;
import org.opensearch.commons.alerting.action.DeleteMonitorRequest;
import org.opensearch.commons.alerting.action.DeleteMonitorResponse;
import org.opensearch.commons.alerting.action.DeleteWorkflowResponse;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.extensions.AcknowledgedResponse;
import org.opensearch.securityanalytics.action.DeleteDetectorAction;
import org.opensearch.securityanalytics.action.DeleteDetectorRequest;
import org.opensearch.securityanalytics.action.DeleteDetectorResponse;
import org.opensearch.securityanalytics.mapper.IndexTemplateManager;
import org.opensearch.securityanalytics.model.Detector;
import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings;
import org.opensearch.securityanalytics.util.DetectorIndices;
import org.opensearch.securityanalytics.util.MonitorService;
import org.opensearch.securityanalytics.util.RuleTopicIndices;
import org.opensearch.securityanalytics.util.SecurityAnalyticsException;
import org.opensearch.securityanalytics.util.WorkflowService;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static org.opensearch.securityanalytics.model.Detector.NO_VERSION;

Expand All @@ -60,21 +68,38 @@ public class TransportDeleteDetectorAction extends HandledTransportAction<Delete

private final NamedXContentRegistry xContentRegistry;

private final WorkflowService workflowService;

private final MonitorService monitorService;

private final ThreadPool threadPool;

private final Settings settings;

private final ClusterService clusterService;
private volatile Boolean enabledWorkflowUsage;

private final IndexTemplateManager indexTemplateManager;

private final DetectorIndices detectorIndices;

@Inject
public TransportDeleteDetectorAction(TransportService transportService, IndexTemplateManager indexTemplateManager, Client client, ActionFilters actionFilters, NamedXContentRegistry xContentRegistry, RuleTopicIndices ruleTopicIndices, DetectorIndices detectorIndices) {
public TransportDeleteDetectorAction(TransportService transportService, IndexTemplateManager indexTemplateManager, Client client, ActionFilters actionFilters, NamedXContentRegistry xContentRegistry, RuleTopicIndices ruleTopicIndices, DetectorIndices detectorIndices, ClusterService clusterService,
Settings settings) {
super(DeleteDetectorAction.NAME, transportService, actionFilters, DeleteDetectorRequest::new);
this.client = client;
this.ruleTopicIndices = ruleTopicIndices;
this.xContentRegistry = xContentRegistry;
this.threadPool = client.threadPool();
this.indexTemplateManager = indexTemplateManager;
this.detectorIndices = detectorIndices;
this.monitorService = new MonitorService(client);
this.workflowService = new WorkflowService(client, monitorService);
this.clusterService = clusterService;
this.settings = settings;

this.enabledWorkflowUsage = SecurityAnalyticsSettings.ENABLE_WORKFLOW_USAGE.get(this.settings);
this.clusterService.getClusterSettings().addSettingsUpdateConsumer(SecurityAnalyticsSettings.ENABLE_WORKFLOW_USAGE, this::setEnabledWorkflowUsage);
}

@Override
Expand Down Expand Up @@ -155,39 +180,63 @@ public void onFailure(Exception t) {
}

private void onGetResponse(Detector detector) {
List<String> monitorIds = detector.getMonitorIds();
ActionListener<DeleteMonitorResponse> deletesListener = new GroupedActionListener<>(new ActionListener<>() {
@Override
public void onResponse(Collection<DeleteMonitorResponse> responses) {
SetOnce<RestStatus> errorStatusSupplier = new SetOnce<>();
if (responses.stream().filter(response -> {
if (response.getStatus() != RestStatus.OK) {
log.error("Detector not being deleted because monitor [{}] could not be deleted. Status [{}]", response.getId(), response.getStatus());
errorStatusSupplier.trySet(response.getStatus());
return true;
StepListener<AcknowledgedResponse> onDeleteWorkflowStep = new StepListener<>();
// 1. Delete the workflow if the workflow is supported
deleteWorkflow(detector, onDeleteWorkflowStep);
onDeleteWorkflowStep.whenComplete(acknowledgedResponse -> {
List<String> monitorIds = detector.getMonitorIds();
ActionListener<DeleteMonitorResponse> deletesListener = new GroupedActionListener<>(new ActionListener<>() {
@Override
public void onResponse(Collection<DeleteMonitorResponse> responses) {
SetOnce<RestStatus> errorStatusSupplier = new SetOnce<>();
if (responses.stream().filter(response -> {
if (response.getStatus() != RestStatus.OK) {
log.error("Detector not being deleted because monitor [{}] could not be deleted. Status [{}]", response.getId(), response.getStatus());
errorStatusSupplier.trySet(response.getStatus());
return true;
}
return false;
}).count() > 0) {
onFailures(new OpenSearchStatusException("Monitor associated with detected could not be deleted", errorStatusSupplier.get()));
}
return false;
}).count() > 0) {
onFailures(new OpenSearchStatusException("Monitor associated with detected could not be deleted", errorStatusSupplier.get()));
deleteDetectorFromConfig(detector.getId(), request.getRefreshPolicy());
}
deleteDetectorFromConfig(detector.getId(), request.getRefreshPolicy());
}

@Override
public void onFailure(Exception e) {
if(isOnlyMonitorOrIndexMissingExceptionThrownByGroupedActionListener(e, detector.getId())) {
deleteDetectorFromConfig(detector.getId(), request.getRefreshPolicy());
} else {
log.error(String.format(Locale.ROOT, "Failed to delete detector %s", detector.getId()), e);
if (counter.compareAndSet(false, true)) {
finishHim(null, e);
@Override
public void onFailure(Exception e) {
if (isOnlyMonitorOrIndexMissingExceptionThrownByGroupedActionListener(e, detector.getId())) {
deleteDetectorFromConfig(detector.getId(), request.getRefreshPolicy());
} else {
log.error(String.format(Locale.ROOT, "Failed to delete detector %s", detector.getId()), e);
if (counter.compareAndSet(false, true)) {
finishHim(null, e);
}
}
}
}, monitorIds.size());
for (String monitorId : monitorIds) {
deleteAlertingMonitor(monitorId, request.getRefreshPolicy(),
deletesListener);
}
}, e -> {
if (counter.compareAndSet(false, true)) {
finishHim(null, e);
}
}, monitorIds.size());
for (String monitorId : monitorIds) {
deleteAlertingMonitor(monitorId, request.getRefreshPolicy(),
deletesListener);
});
}

private void deleteWorkflow(Detector detector, ActionListener<AcknowledgedResponse> actionListener) {
if (detector.isWorkflowSupported() && enabledWorkflowUsage) {
var workflowId = detector.getWorkflowIds().get(0);
log.debug(String.format("Deleting the workflow %s before deleting the detector", workflowId));
StepListener<DeleteWorkflowResponse> onDeleteWorkflowStep = new StepListener<>();
workflowService.deleteWorkflow(workflowId, onDeleteWorkflowStep);
onDeleteWorkflowStep.whenComplete(deleteWorkflowResponse -> {
actionListener.onResponse(new AcknowledgedResponse(true));
}, actionListener::onFailure);
} else {
// If detector doesn't have the workflows it means that older version of the plugin is used and just skip the step
actionListener.onResponse(new AcknowledgedResponse(true));
}
}

Expand All @@ -211,6 +260,7 @@ public void onFailure(Exception e) {
});

}

@Override
public void onFailure(Exception t) {
onFailures(t);
Expand All @@ -235,7 +285,7 @@ private void onFailures(Exception t) {
private void finishHim(String detectorId, Exception t) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(ActionRunnable.supply(listener, () -> {
if (t != null) {
log.error(String.format(Locale.ROOT, "Failed to delete detector %s",detectorId), t);
log.error(String.format(Locale.ROOT, "Failed to delete detector %s", detectorId), t);
if (t instanceof OpenSearchStatusException) {
throw t;
}
Expand All @@ -256,8 +306,8 @@ private boolean isOnlyMonitorOrIndexMissingExceptionThrownByGroupedActionListene
for (int i = 0; i <= len; i++) {
Throwable e = i == len ? ex : ex.getSuppressed()[i];
if (e.getMessage().matches("(.*)Monitor(.*) is not found(.*)")
|| e.getMessage().contains(
"Configured indices are not found: [.opendistro-alerting-config]")
|| e.getMessage().contains(
"Configured indices are not found: [.opendistro-alerting-config]")
) {
log.error(
String.format(Locale.ROOT, "Monitor or jobs index already deleted." +
Expand All @@ -270,4 +320,8 @@ private boolean isOnlyMonitorOrIndexMissingExceptionThrownByGroupedActionListene
return true;
}
}

private void setEnabledWorkflowUsage(boolean enabledWorkflowUsage) {
this.enabledWorkflowUsage = enabledWorkflowUsage;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ protected void doExecute(Task task, GetFindingsRequest request, ActionListener<G
actionListener
);
} else {
// "detector" is nested type so we have to use nested query
// "detector" is nested type, so we have to use nested query
NestedQueryBuilder queryBuilder =
QueryBuilders.nestedQuery(
"detector",
Expand Down
Loading

0 comments on commit a82fbc9

Please sign in to comment.