Skip to content

Commit

Permalink
Using alerting workflows in detectors (opensearch-project#394)
Browse files Browse the repository at this point in the history
Enables workflow creation - all the monitors related with the given detector will be part of the detector
Enables workflow update during detector update
Deletes the workflow during the detector deletion
Adds additional field to a detector

Signed-off-by: Stevan Buzejic <[email protected]>
  • Loading branch information
stevanbz authored and eirsep committed Aug 24, 2023
1 parent 3d8da02 commit e612fd5
Show file tree
Hide file tree
Showing 16 changed files with 1,411 additions and 228 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,8 @@ public List<Setting<?>> getSettings() {
SecurityAnalyticsSettings.FINDING_HISTORY_RETENTION_PERIOD,
SecurityAnalyticsSettings.IS_CORRELATION_INDEX_SETTING,
SecurityAnalyticsSettings.CORRELATION_TIME_WINDOW,
SecurityAnalyticsSettings.DEFAULT_MAPPING_SCHEMA
SecurityAnalyticsSettings.DEFAULT_MAPPING_SCHEMA,
SecurityAnalyticsSettings.ENABLE_WORKFLOW_USAGE
);
}

Expand Down
49 changes: 45 additions & 4 deletions src/main/java/org/opensearch/securityanalytics/model/Detector.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public class Detector implements Writeable, ToXContentObject {
public static final String ENABLED_TIME_FIELD = "enabled_time";
public static final String ALERTING_MONITOR_ID = "monitor_id";

public static final String ALERTING_WORKFLOW_ID = "workflow_ids";

public static final String BUCKET_MONITOR_ID_RULE_ID = "bucket_monitor_id_rule_id";
private static final String RULE_TOPIC_INDEX = "rule_topic_index";

Expand Down Expand Up @@ -99,6 +101,8 @@ public class Detector implements Writeable, ToXContentObject {

private Map<String, String> ruleIdMonitorIdMap;

private List<String> workflowIds;

private String ruleIndex;

private String alertsIndex;
Expand All @@ -117,7 +121,7 @@ public Detector(String id, Long version, String name, Boolean enabled, Schedule
Instant lastUpdateTime, Instant enabledTime, String logType,
User user, List<DetectorInput> inputs, List<DetectorTrigger> triggers, List<String> monitorIds,
String ruleIndex, String alertsIndex, String alertsHistoryIndex, String alertsHistoryIndexPattern,
String findingsIndex, String findingsIndexPattern, Map<String, String> rulePerMonitor) {
String findingsIndex, String findingsIndexPattern, Map<String, String> rulePerMonitor, List<String> workflowIds) {
this.type = DETECTOR_TYPE;

this.id = id != null ? id : NO_ID;
Expand All @@ -139,6 +143,7 @@ public Detector(String id, Long version, String name, Boolean enabled, Schedule
this.findingsIndexPattern = findingsIndexPattern;
this.ruleIdMonitorIdMap = rulePerMonitor;
this.logType = logType;
this.workflowIds = workflowIds != null ? workflowIds : null;

if (enabled) {
Objects.requireNonNull(enabledTime);
Expand All @@ -165,7 +170,8 @@ public Detector(StreamInput sin) throws IOException {
sin.readString(),
sin.readString(),
sin.readString(),
sin.readMap(StreamInput::readString, StreamInput::readString)
sin.readMap(StreamInput::readString, StreamInput::readString),
sin.readStringList()
);
}

Expand Down Expand Up @@ -200,6 +206,10 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(ruleIndex);

out.writeMap(ruleIdMonitorIdMap, StreamOutput::writeString, StreamOutput::writeString);

if (workflowIds != null) {
out.writeStringCollection(workflowIds);
}
}

public XContentBuilder toXContentWithUser(XContentBuilder builder, Params params) throws IOException {
Expand Down Expand Up @@ -253,6 +263,14 @@ private XContentBuilder createXContentBuilder(XContentBuilder builder, ToXConten
}

builder.field(ALERTING_MONITOR_ID, monitorIds);

if (workflowIds == null) {
builder.nullField(ALERTING_WORKFLOW_ID);
} else {
builder.field(ALERTING_WORKFLOW_ID, workflowIds);
}


builder.field(BUCKET_MONITOR_ID_RULE_ID, ruleIdMonitorIdMap);
builder.field(RULE_TOPIC_INDEX, ruleIndex);
builder.field(ALERTS_INDEX, alertsIndex);
Expand Down Expand Up @@ -299,6 +317,7 @@ public static Detector parse(XContentParser xcp, String id, Long version) throws
List<DetectorInput> inputs = new ArrayList<>();
List<DetectorTrigger> triggers = new ArrayList<>();
List<String> monitorIds = new ArrayList<>();
List<String> workflowIds = new ArrayList<>();
Map<String, String> rulePerMonitor = new HashMap<>();

String ruleIndex = null;
Expand Down Expand Up @@ -374,6 +393,15 @@ public static Detector parse(XContentParser xcp, String id, Long version) throws
monitorIds.add(monitorId);
}
break;
case ALERTING_WORKFLOW_ID:
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp);
while (xcp.nextToken() != XContentParser.Token.END_ARRAY) {
String workflowId = xcp.textOrNull();
if (workflowId != null) {
workflowIds.add(workflowId);
}
}
break;
case BUCKET_MONITOR_ID_RULE_ID:
rulePerMonitor= xcp.mapStrings();
break;
Expand Down Expand Up @@ -429,8 +457,9 @@ public static Detector parse(XContentParser xcp, String id, Long version) throws
alertsHistoryIndexPattern,
findingsIndex,
findingsIndexPattern,
rulePerMonitor
);
rulePerMonitor,
workflowIds
);
}

public static Detector readFrom(StreamInput sin) throws IOException {
Expand Down Expand Up @@ -566,10 +595,22 @@ public void setRuleIdMonitorIdMap(Map<String, String> ruleIdMonitorIdMap) {
this.ruleIdMonitorIdMap = ruleIdMonitorIdMap;
}

public void setWorkflowIds(List<String> workflowIds) {
this.workflowIds = workflowIds;
}

public List<String> getWorkflowIds() {
return workflowIds;
}

public String getDocLevelMonitorId() {
return ruleIdMonitorIdMap.get(DOC_LEVEL_MONITOR);
}

public boolean isWorkflowSupported() {
return workflowIds != null && !workflowIds.isEmpty();
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,10 @@ public class SecurityAnalyticsSettings {
"ecs",
Setting.Property.NodeScope, Setting.Property.Dynamic
);

public static final Setting<Boolean> ENABLE_WORKFLOW_USAGE = Setting.boolSetting(
"plugins.security_analytics.enable_workflow_usage",
false,
Setting.Property.NodeScope, Setting.Property.Dynamic
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,12 @@
*/
package org.opensearch.securityanalytics.transport;

import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.SetOnce;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionRunnable;
import org.opensearch.action.StepListener;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.get.GetRequest;
Expand All @@ -25,28 +20,41 @@
import org.opensearch.action.support.WriteRequest;
import org.opensearch.client.Client;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.SetOnce;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.commons.alerting.AlertingPluginInterface;
import org.opensearch.commons.alerting.action.DeleteMonitorRequest;
import org.opensearch.commons.alerting.action.DeleteMonitorResponse;
import org.opensearch.commons.alerting.action.DeleteWorkflowResponse;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.extensions.AcknowledgedResponse;
import org.opensearch.rest.RestStatus;
import org.opensearch.securityanalytics.action.DeleteDetectorAction;
import org.opensearch.securityanalytics.action.DeleteDetectorRequest;
import org.opensearch.securityanalytics.action.DeleteDetectorResponse;
import org.opensearch.securityanalytics.mapper.IndexTemplateManager;
import org.opensearch.securityanalytics.model.Detector;
import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings;
import org.opensearch.securityanalytics.util.DetectorIndices;
import org.opensearch.securityanalytics.util.MonitorService;
import org.opensearch.securityanalytics.util.RuleTopicIndices;
import org.opensearch.securityanalytics.util.SecurityAnalyticsException;
import org.opensearch.securityanalytics.util.WorkflowService;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static org.opensearch.securityanalytics.model.Detector.NO_VERSION;

Expand All @@ -60,21 +68,40 @@ public class TransportDeleteDetectorAction extends HandledTransportAction<Delete

private final NamedXContentRegistry xContentRegistry;

private final WorkflowService workflowService;

private final MonitorService monitorService;

private final ThreadPool threadPool;

private final Settings settings;

private final ClusterService clusterService;
private volatile Boolean enabledWorkflowUsage;

private final IndexTemplateManager indexTemplateManager;

private final DetectorIndices detectorIndices;

@Inject
public TransportDeleteDetectorAction(TransportService transportService, IndexTemplateManager indexTemplateManager, Client client, ActionFilters actionFilters, NamedXContentRegistry xContentRegistry, RuleTopicIndices ruleTopicIndices, DetectorIndices detectorIndices) {
public TransportDeleteDetectorAction(TransportService transportService, IndexTemplateManager indexTemplateManager, Client client, ActionFilters actionFilters, NamedXContentRegistry xContentRegistry, RuleTopicIndices ruleTopicIndices, DetectorIndices detectorIndices,
ClusterService clusterService,
Settings settings) {
super(DeleteDetectorAction.NAME, transportService, actionFilters, DeleteDetectorRequest::new);
this.client = client;
this.ruleTopicIndices = ruleTopicIndices;
this.xContentRegistry = xContentRegistry;
this.threadPool = client.threadPool();
this.indexTemplateManager = indexTemplateManager;
this.detectorIndices = detectorIndices;
this.monitorService = new MonitorService(client);
this.workflowService = new WorkflowService(client, monitorService);
this.clusterService = clusterService;
this.settings = settings;

this.enabledWorkflowUsage = SecurityAnalyticsSettings.ENABLE_WORKFLOW_USAGE.get(this.settings);
this.clusterService.getClusterSettings().addSettingsUpdateConsumer(SecurityAnalyticsSettings.ENABLE_WORKFLOW_USAGE, this::setEnabledWorkflowUsage);

}

@Override
Expand Down Expand Up @@ -191,6 +218,21 @@ public void onFailure(Exception e) {
}
}

private void deleteWorkflow(Detector detector, ActionListener<AcknowledgedResponse> actionListener) {
if (detector.isWorkflowSupported() && enabledWorkflowUsage) {
var workflowId = detector.getWorkflowIds().get(0);
log.debug(String.format("Deleting the workflow %s before deleting the detector", workflowId));
StepListener<DeleteWorkflowResponse> onDeleteWorkflowStep = new StepListener<>();
workflowService.deleteWorkflow(workflowId, onDeleteWorkflowStep);
onDeleteWorkflowStep.whenComplete(deleteWorkflowResponse -> {
actionListener.onResponse(new AcknowledgedResponse(true));
}, actionListener::onFailure);
} else {
// If detector doesn't have the workflows it means that older version of the plugin is used and just skip the step
actionListener.onResponse(new AcknowledgedResponse(true));
}
}

private void deleteDetectorFromConfig(String detectorId, WriteRequest.RefreshPolicy refreshPolicy) {
deleteDetector(detectorId, refreshPolicy,
new ActionListener<>() {
Expand Down Expand Up @@ -270,4 +312,7 @@ private boolean isOnlyMonitorOrIndexMissingExceptionThrownByGroupedActionListene
return true;
}
}
private void setEnabledWorkflowUsage(boolean enabledWorkflowUsage) {
this.enabledWorkflowUsage = enabledWorkflowUsage;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ protected void doExecute(Task task, GetFindingsRequest request, ActionListener<G
actionListener
);
} else {
// "detector" is nested type so we have to use nested query
// "detector" is nested type, so we have to use nested query
NestedQueryBuilder queryBuilder =
QueryBuilders.nestedQuery(
"detector",
Expand Down
Loading

0 comments on commit e612fd5

Please sign in to comment.