Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using alerting workflows in detectors #541

Merged
merged 1 commit into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,8 @@ public List<Setting<?>> getSettings() {
SecurityAnalyticsSettings.FINDING_HISTORY_RETENTION_PERIOD,
SecurityAnalyticsSettings.IS_CORRELATION_INDEX_SETTING,
SecurityAnalyticsSettings.CORRELATION_TIME_WINDOW,
SecurityAnalyticsSettings.DEFAULT_MAPPING_SCHEMA
SecurityAnalyticsSettings.DEFAULT_MAPPING_SCHEMA,
SecurityAnalyticsSettings.ENABLE_WORKFLOW_USAGE
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public class Detector implements Writeable, ToXContentObject {
public static final String ENABLED_TIME_FIELD = "enabled_time";
public static final String ALERTING_MONITOR_ID = "monitor_id";

public static final String ALERTING_WORKFLOW_ID = "workflow_ids";

public static final String BUCKET_MONITOR_ID_RULE_ID = "bucket_monitor_id_rule_id";
private static final String RULE_TOPIC_INDEX = "rule_topic_index";

Expand Down Expand Up @@ -99,6 +101,8 @@ public class Detector implements Writeable, ToXContentObject {

private Map<String, String> ruleIdMonitorIdMap;

private List<String> workflowIds;

private String ruleIndex;

private String alertsIndex;
Expand All @@ -117,7 +121,7 @@ public Detector(String id, Long version, String name, Boolean enabled, Schedule
Instant lastUpdateTime, Instant enabledTime, String logType,
User user, List<DetectorInput> inputs, List<DetectorTrigger> triggers, List<String> monitorIds,
String ruleIndex, String alertsIndex, String alertsHistoryIndex, String alertsHistoryIndexPattern,
String findingsIndex, String findingsIndexPattern, Map<String, String> rulePerMonitor) {
String findingsIndex, String findingsIndexPattern, Map<String, String> rulePerMonitor, List<String> workflowIds) {
this.type = DETECTOR_TYPE;

this.id = id != null ? id : NO_ID;
Expand All @@ -139,6 +143,7 @@ public Detector(String id, Long version, String name, Boolean enabled, Schedule
this.findingsIndexPattern = findingsIndexPattern;
this.ruleIdMonitorIdMap = rulePerMonitor;
this.logType = logType;
this.workflowIds = workflowIds != null ? workflowIds : null;

if (enabled) {
Objects.requireNonNull(enabledTime);
Expand All @@ -165,7 +170,8 @@ public Detector(StreamInput sin) throws IOException {
sin.readString(),
sin.readString(),
sin.readString(),
sin.readMap(StreamInput::readString, StreamInput::readString)
sin.readMap(StreamInput::readString, StreamInput::readString),
sin.readStringList()
);
}

Expand Down Expand Up @@ -200,6 +206,10 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(ruleIndex);

out.writeMap(ruleIdMonitorIdMap, StreamOutput::writeString, StreamOutput::writeString);

if (workflowIds != null) {
out.writeStringCollection(workflowIds);
}
}

public XContentBuilder toXContentWithUser(XContentBuilder builder, Params params) throws IOException {
Expand Down Expand Up @@ -253,6 +263,14 @@ private XContentBuilder createXContentBuilder(XContentBuilder builder, ToXConten
}

builder.field(ALERTING_MONITOR_ID, monitorIds);

if (workflowIds == null) {
builder.nullField(ALERTING_WORKFLOW_ID);
} else {
builder.field(ALERTING_WORKFLOW_ID, workflowIds);
}


builder.field(BUCKET_MONITOR_ID_RULE_ID, ruleIdMonitorIdMap);
builder.field(RULE_TOPIC_INDEX, ruleIndex);
builder.field(ALERTS_INDEX, alertsIndex);
Expand Down Expand Up @@ -299,6 +317,7 @@ public static Detector parse(XContentParser xcp, String id, Long version) throws
List<DetectorInput> inputs = new ArrayList<>();
List<DetectorTrigger> triggers = new ArrayList<>();
List<String> monitorIds = new ArrayList<>();
List<String> workflowIds = new ArrayList<>();
Map<String, String> rulePerMonitor = new HashMap<>();

String ruleIndex = null;
Expand Down Expand Up @@ -374,6 +393,15 @@ public static Detector parse(XContentParser xcp, String id, Long version) throws
monitorIds.add(monitorId);
}
break;
case ALERTING_WORKFLOW_ID:
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp);
while (xcp.nextToken() != XContentParser.Token.END_ARRAY) {
String workflowId = xcp.textOrNull();
if (workflowId != null) {
workflowIds.add(workflowId);
}
}
break;
case BUCKET_MONITOR_ID_RULE_ID:
rulePerMonitor= xcp.mapStrings();
break;
Expand Down Expand Up @@ -429,7 +457,8 @@ public static Detector parse(XContentParser xcp, String id, Long version) throws
alertsHistoryIndexPattern,
findingsIndex,
findingsIndexPattern,
rulePerMonitor
rulePerMonitor,
workflowIds
);
}

Expand Down Expand Up @@ -566,10 +595,22 @@ public void setRuleIdMonitorIdMap(Map<String, String> ruleIdMonitorIdMap) {
this.ruleIdMonitorIdMap = ruleIdMonitorIdMap;
}

public void setWorkflowIds(List<String> workflowIds) {
this.workflowIds = workflowIds;
}

public List<String> getWorkflowIds() {
return workflowIds;
}

public String getDocLevelMonitorId() {
return ruleIdMonitorIdMap.get(DOC_LEVEL_MONITOR);
}

public boolean isWorkflowSupported() {
return workflowIds != null && !workflowIds.isEmpty();
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ public class SecurityAnalyticsSettings {
Setting.Property.NodeScope, Setting.Property.Dynamic
);

public static final Setting<Boolean> ENABLE_WORKFLOW_USAGE = Setting.boolSetting(
"plugins.security_analytics.enable_workflow_usage",
false,
Setting.Property.NodeScope, Setting.Property.Dynamic
);

public static final Setting<Boolean> IS_CORRELATION_INDEX_SETTING = Setting.boolSetting(CORRELATION_INDEX, false, Setting.Property.IndexScope);

public static final Setting<TimeValue> CORRELATION_TIME_WINDOW = Setting.positiveTimeSetting(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,12 @@
*/
package org.opensearch.securityanalytics.transport;

import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.SetOnce;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.core.action.ActionListener;
import org.opensearch.action.ActionRunnable;
import org.opensearch.action.StepListener;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.get.GetRequest;
Expand All @@ -25,28 +20,41 @@
import org.opensearch.action.support.WriteRequest;
import org.opensearch.client.Client;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.commons.alerting.AlertingPluginInterface;
import org.opensearch.commons.alerting.action.DeleteMonitorRequest;
import org.opensearch.commons.alerting.action.DeleteMonitorResponse;
import org.opensearch.commons.alerting.action.DeleteWorkflowResponse;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.extensions.AcknowledgedResponse;
import org.opensearch.securityanalytics.action.DeleteDetectorAction;
import org.opensearch.securityanalytics.action.DeleteDetectorRequest;
import org.opensearch.securityanalytics.action.DeleteDetectorResponse;
import org.opensearch.securityanalytics.mapper.IndexTemplateManager;
import org.opensearch.securityanalytics.model.Detector;
import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings;
import org.opensearch.securityanalytics.util.DetectorIndices;
import org.opensearch.securityanalytics.util.MonitorService;
import org.opensearch.securityanalytics.util.RuleTopicIndices;
import org.opensearch.securityanalytics.util.SecurityAnalyticsException;
import org.opensearch.securityanalytics.util.WorkflowService;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static org.opensearch.securityanalytics.model.Detector.NO_VERSION;

Expand All @@ -60,21 +68,38 @@ public class TransportDeleteDetectorAction extends HandledTransportAction<Delete

private final NamedXContentRegistry xContentRegistry;

private final WorkflowService workflowService;

private final MonitorService monitorService;

private final ThreadPool threadPool;

private final Settings settings;

private final ClusterService clusterService;
private volatile Boolean enabledWorkflowUsage;

private final IndexTemplateManager indexTemplateManager;

private final DetectorIndices detectorIndices;

@Inject
public TransportDeleteDetectorAction(TransportService transportService, IndexTemplateManager indexTemplateManager, Client client, ActionFilters actionFilters, NamedXContentRegistry xContentRegistry, RuleTopicIndices ruleTopicIndices, DetectorIndices detectorIndices) {
public TransportDeleteDetectorAction(TransportService transportService, IndexTemplateManager indexTemplateManager, Client client, ActionFilters actionFilters, NamedXContentRegistry xContentRegistry, RuleTopicIndices ruleTopicIndices, DetectorIndices detectorIndices, ClusterService clusterService,
Settings settings) {
super(DeleteDetectorAction.NAME, transportService, actionFilters, DeleteDetectorRequest::new);
this.client = client;
this.ruleTopicIndices = ruleTopicIndices;
this.xContentRegistry = xContentRegistry;
this.threadPool = client.threadPool();
this.indexTemplateManager = indexTemplateManager;
this.detectorIndices = detectorIndices;
this.monitorService = new MonitorService(client);
this.workflowService = new WorkflowService(client, monitorService);
this.clusterService = clusterService;
this.settings = settings;

this.enabledWorkflowUsage = SecurityAnalyticsSettings.ENABLE_WORKFLOW_USAGE.get(this.settings);
this.clusterService.getClusterSettings().addSettingsUpdateConsumer(SecurityAnalyticsSettings.ENABLE_WORKFLOW_USAGE, this::setEnabledWorkflowUsage);
}

@Override
Expand Down Expand Up @@ -155,39 +180,63 @@ public void onFailure(Exception t) {
}

private void onGetResponse(Detector detector) {
List<String> monitorIds = detector.getMonitorIds();
ActionListener<DeleteMonitorResponse> deletesListener = new GroupedActionListener<>(new ActionListener<>() {
@Override
public void onResponse(Collection<DeleteMonitorResponse> responses) {
SetOnce<RestStatus> errorStatusSupplier = new SetOnce<>();
if (responses.stream().filter(response -> {
if (response.getStatus() != RestStatus.OK) {
log.error("Detector not being deleted because monitor [{}] could not be deleted. Status [{}]", response.getId(), response.getStatus());
errorStatusSupplier.trySet(response.getStatus());
return true;
StepListener<AcknowledgedResponse> onDeleteWorkflowStep = new StepListener<>();
// 1. Delete the workflow if the workflow is supported
deleteWorkflow(detector, onDeleteWorkflowStep);
onDeleteWorkflowStep.whenComplete(acknowledgedResponse -> {
List<String> monitorIds = detector.getMonitorIds();
ActionListener<DeleteMonitorResponse> deletesListener = new GroupedActionListener<>(new ActionListener<>() {
@Override
public void onResponse(Collection<DeleteMonitorResponse> responses) {
SetOnce<RestStatus> errorStatusSupplier = new SetOnce<>();
if (responses.stream().filter(response -> {
if (response.getStatus() != RestStatus.OK) {
log.error("Detector not being deleted because monitor [{}] could not be deleted. Status [{}]", response.getId(), response.getStatus());
errorStatusSupplier.trySet(response.getStatus());
return true;
}
return false;
}).count() > 0) {
onFailures(new OpenSearchStatusException("Monitor associated with detected could not be deleted", errorStatusSupplier.get()));
}
return false;
}).count() > 0) {
onFailures(new OpenSearchStatusException("Monitor associated with detected could not be deleted", errorStatusSupplier.get()));
deleteDetectorFromConfig(detector.getId(), request.getRefreshPolicy());
}
deleteDetectorFromConfig(detector.getId(), request.getRefreshPolicy());
}

@Override
public void onFailure(Exception e) {
if(isOnlyMonitorOrIndexMissingExceptionThrownByGroupedActionListener(e, detector.getId())) {
deleteDetectorFromConfig(detector.getId(), request.getRefreshPolicy());
} else {
log.error(String.format(Locale.ROOT, "Failed to delete detector %s", detector.getId()), e);
if (counter.compareAndSet(false, true)) {
finishHim(null, e);
@Override
public void onFailure(Exception e) {
if (isOnlyMonitorOrIndexMissingExceptionThrownByGroupedActionListener(e, detector.getId())) {
deleteDetectorFromConfig(detector.getId(), request.getRefreshPolicy());
} else {
log.error(String.format(Locale.ROOT, "Failed to delete detector %s", detector.getId()), e);
if (counter.compareAndSet(false, true)) {
finishHim(null, e);
}
}
}
}, monitorIds.size());
for (String monitorId : monitorIds) {
deleteAlertingMonitor(monitorId, request.getRefreshPolicy(),
deletesListener);
}
}, e -> {
if (counter.compareAndSet(false, true)) {
finishHim(null, e);
}
}, monitorIds.size());
for (String monitorId : monitorIds) {
deleteAlertingMonitor(monitorId, request.getRefreshPolicy(),
deletesListener);
});
}

private void deleteWorkflow(Detector detector, ActionListener<AcknowledgedResponse> actionListener) {
if (detector.isWorkflowSupported() && enabledWorkflowUsage) {
var workflowId = detector.getWorkflowIds().get(0);
log.debug(String.format("Deleting the workflow %s before deleting the detector", workflowId));
StepListener<DeleteWorkflowResponse> onDeleteWorkflowStep = new StepListener<>();
workflowService.deleteWorkflow(workflowId, onDeleteWorkflowStep);
onDeleteWorkflowStep.whenComplete(deleteWorkflowResponse -> {
actionListener.onResponse(new AcknowledgedResponse(true));
}, actionListener::onFailure);
} else {
// If detector doesn't have the workflows it means that older version of the plugin is used and just skip the step
actionListener.onResponse(new AcknowledgedResponse(true));
}
}

Expand All @@ -211,6 +260,7 @@ public void onFailure(Exception e) {
});

}

@Override
public void onFailure(Exception t) {
onFailures(t);
Expand All @@ -235,7 +285,7 @@ private void onFailures(Exception t) {
private void finishHim(String detectorId, Exception t) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(ActionRunnable.supply(listener, () -> {
if (t != null) {
log.error(String.format(Locale.ROOT, "Failed to delete detector %s",detectorId), t);
log.error(String.format(Locale.ROOT, "Failed to delete detector %s", detectorId), t);
if (t instanceof OpenSearchStatusException) {
throw t;
}
Expand All @@ -256,8 +306,8 @@ private boolean isOnlyMonitorOrIndexMissingExceptionThrownByGroupedActionListene
for (int i = 0; i <= len; i++) {
Throwable e = i == len ? ex : ex.getSuppressed()[i];
if (e.getMessage().matches("(.*)Monitor(.*) is not found(.*)")
|| e.getMessage().contains(
"Configured indices are not found: [.opendistro-alerting-config]")
|| e.getMessage().contains(
"Configured indices are not found: [.opendistro-alerting-config]")
) {
log.error(
String.format(Locale.ROOT, "Monitor or jobs index already deleted." +
Expand All @@ -270,4 +320,8 @@ private boolean isOnlyMonitorOrIndexMissingExceptionThrownByGroupedActionListene
return true;
}
}

private void setEnabledWorkflowUsage(boolean enabledWorkflowUsage) {
this.enabledWorkflowUsage = enabledWorkflowUsage;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ protected void doExecute(Task task, GetFindingsRequest request, ActionListener<G
actionListener
);
} else {
// "detector" is nested type so we have to use nested query
// "detector" is nested type, so we have to use nested query
NestedQueryBuilder queryBuilder =
QueryBuilders.nestedQuery(
"detector",
Expand Down
Loading