-
Notifications
You must be signed in to change notification settings - Fork 76
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Extended detector use-cases to consider the workflow #394
Changes from 1 commit
13d83a2
8d2b5c9
1aee052
8ad56b2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -51,6 +51,8 @@ public class Detector implements Writeable, ToXContentObject { | |
public static final String ENABLED_TIME_FIELD = "enabled_time"; | ||
public static final String ALERTING_MONITOR_ID = "monitor_id"; | ||
|
||
public static final String ALERTING_WORKFLOW_ID = "workflow_ids"; | ||
|
||
public static final String BUCKET_MONITOR_ID_RULE_ID = "bucket_monitor_id_rule_id"; | ||
private static final String RULE_TOPIC_INDEX = "rule_topic_index"; | ||
|
||
|
@@ -98,6 +100,8 @@ public class Detector implements Writeable, ToXContentObject { | |
|
||
private Map<String, String> ruleIdMonitorIdMap; | ||
|
||
private List<String> workflowIds; | ||
|
||
private String ruleIndex; | ||
|
||
private String alertsIndex; | ||
|
@@ -116,7 +120,7 @@ public Detector(String id, Long version, String name, Boolean enabled, Schedule | |
Instant lastUpdateTime, Instant enabledTime, DetectorType detectorType, | ||
User user, List<DetectorInput> inputs, List<DetectorTrigger> triggers, List<String> monitorIds, | ||
String ruleIndex, String alertsIndex, String alertsHistoryIndex, String alertsHistoryIndexPattern, | ||
String findingsIndex, String findingsIndexPattern, Map<String, String> rulePerMonitor) { | ||
String findingsIndex, String findingsIndexPattern, Map<String, String> rulePerMonitor, List<String> workflowIds) { | ||
this.type = DETECTOR_TYPE; | ||
|
||
this.id = id != null ? id : NO_ID; | ||
|
@@ -138,6 +142,7 @@ public Detector(String id, Long version, String name, Boolean enabled, Schedule | |
this.findingsIndex = findingsIndex; | ||
this.findingsIndexPattern = findingsIndexPattern; | ||
this.ruleIdMonitorIdMap = rulePerMonitor; | ||
this.workflowIds = workflowIds != null ? workflowIds : Collections.emptyList(); | ||
|
||
if (enabled) { | ||
Objects.requireNonNull(enabledTime); | ||
|
@@ -164,7 +169,8 @@ public Detector(StreamInput sin) throws IOException { | |
sin.readString(), | ||
sin.readString(), | ||
sin.readString(), | ||
sin.readMap(StreamInput::readString, StreamInput::readString) | ||
sin.readMap(StreamInput::readString, StreamInput::readString), | ||
sin.readStringList() | ||
); | ||
} | ||
|
||
|
@@ -199,6 +205,10 @@ public void writeTo(StreamOutput out) throws IOException { | |
out.writeString(ruleIndex); | ||
|
||
out.writeMap(ruleIdMonitorIdMap, StreamOutput::writeString, StreamOutput::writeString); | ||
|
||
if (workflowIds != null) { | ||
out.writeStringCollection(workflowIds); | ||
} | ||
} | ||
|
||
public XContentBuilder toXContentWithUser(XContentBuilder builder, Params params) throws IOException { | ||
|
@@ -287,6 +297,14 @@ private XContentBuilder createXContentBuilder(XContentBuilder builder, ToXConten | |
} | ||
|
||
builder.field(ALERTING_MONITOR_ID, monitorIds); | ||
|
||
if (workflowIds == null) { | ||
builder.nullField(ALERTING_WORKFLOW_ID); | ||
} else { | ||
builder.field(ALERTING_WORKFLOW_ID, workflowIds); | ||
} | ||
|
||
|
||
builder.field(BUCKET_MONITOR_ID_RULE_ID, ruleIdMonitorIdMap); | ||
builder.field(RULE_TOPIC_INDEX, ruleIndex); | ||
builder.field(ALERTS_INDEX, alertsIndex); | ||
|
@@ -332,6 +350,7 @@ public static Detector parse(XContentParser xcp, String id, Long version) throws | |
List<DetectorInput> inputs = new ArrayList<>(); | ||
List<DetectorTrigger> triggers = new ArrayList<>(); | ||
List<String> monitorIds = new ArrayList<>(); | ||
List<String> workflowIds = new ArrayList<>(); | ||
Map<String, String> rulePerMonitor = new HashMap<>(); | ||
|
||
String ruleIndex = null; | ||
|
@@ -412,6 +431,13 @@ public static Detector parse(XContentParser xcp, String id, Long version) throws | |
monitorIds.add(monitorId); | ||
} | ||
break; | ||
case ALERTING_WORKFLOW_ID: | ||
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp); | ||
while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { | ||
String workflowId = xcp.text(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. null check? |
||
workflowIds.add(workflowId); | ||
} | ||
break; | ||
case BUCKET_MONITOR_ID_RULE_ID: | ||
rulePerMonitor= xcp.mapStrings(); | ||
break; | ||
|
@@ -463,7 +489,9 @@ public static Detector parse(XContentParser xcp, String id, Long version) throws | |
alertsHistoryIndexPattern, | ||
findingsIndex, | ||
findingsIndexPattern, | ||
rulePerMonitor); | ||
rulePerMonitor, | ||
workflowIds | ||
); | ||
} | ||
|
||
public static Detector readFrom(StreamInput sin) throws IOException { | ||
|
@@ -599,10 +627,22 @@ public void setRuleIdMonitorIdMap(Map<String, String> ruleIdMonitorIdMap) { | |
this.ruleIdMonitorIdMap = ruleIdMonitorIdMap; | ||
} | ||
|
||
public void setWorkflowIds(List<String> workflowIds) { | ||
this.workflowIds = workflowIds; | ||
} | ||
|
||
public List<String> getWorkflowIds() { | ||
return workflowIds; | ||
} | ||
|
||
public String getDocLevelMonitorId() { | ||
return ruleIdMonitorIdMap.get(DOC_LEVEL_MONITOR); | ||
} | ||
|
||
public boolean isWorkflowSupported() { | ||
return workflowIds != null && !workflowIds.isEmpty(); | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) { | ||
if (this == o) return true; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,6 +32,7 @@ | |
import org.opensearch.commons.alerting.AlertingPluginInterface; | ||
import org.opensearch.commons.alerting.action.DeleteMonitorRequest; | ||
import org.opensearch.commons.alerting.action.DeleteMonitorResponse; | ||
import org.opensearch.commons.alerting.action.DeleteWorkflowResponse; | ||
import org.opensearch.core.xcontent.NamedXContentRegistry; | ||
import org.opensearch.core.xcontent.XContentParser; | ||
import org.opensearch.rest.RestStatus; | ||
|
@@ -40,8 +41,10 @@ | |
import org.opensearch.securityanalytics.action.DeleteDetectorResponse; | ||
import org.opensearch.securityanalytics.mapper.IndexTemplateManager; | ||
import org.opensearch.securityanalytics.model.Detector; | ||
import org.opensearch.securityanalytics.util.MonitorService; | ||
import org.opensearch.securityanalytics.util.RuleTopicIndices; | ||
import org.opensearch.securityanalytics.util.SecurityAnalyticsException; | ||
import org.opensearch.securityanalytics.util.WorkflowService; | ||
import org.opensearch.tasks.Task; | ||
import org.opensearch.threadpool.ThreadPool; | ||
import org.opensearch.transport.TransportService; | ||
|
@@ -59,6 +62,10 @@ public class TransportDeleteDetectorAction extends HandledTransportAction<Delete | |
|
||
private final NamedXContentRegistry xContentRegistry; | ||
|
||
private final WorkflowService workflowService; | ||
|
||
private final MonitorService monitorService; | ||
|
||
private final ThreadPool threadPool; | ||
|
||
private final IndexTemplateManager indexTemplateManager; | ||
|
@@ -71,6 +78,9 @@ public TransportDeleteDetectorAction(TransportService transportService, IndexTem | |
this.xContentRegistry = xContentRegistry; | ||
this.threadPool = client.threadPool(); | ||
this.indexTemplateManager = indexTemplateManager; | ||
this.monitorService = new MonitorService(client); | ||
this.workflowService = new WorkflowService(client, monitorService); | ||
|
||
} | ||
|
||
@Override | ||
|
@@ -137,35 +147,57 @@ public void onFailure(Exception t) { | |
} | ||
|
||
private void onGetResponse(Detector detector) { | ||
List<String> monitorIds = detector.getMonitorIds(); | ||
String ruleIndex = detector.getRuleIndex(); | ||
ActionListener<DeleteMonitorResponse> deletesListener = new GroupedActionListener<>(new ActionListener<>() { | ||
@Override | ||
public void onResponse(Collection<DeleteMonitorResponse> responses) { | ||
SetOnce<RestStatus> errorStatusSupplier = new SetOnce<>(); | ||
if (responses.stream().filter(response -> { | ||
if (response.getStatus() != RestStatus.OK) { | ||
log.error("Detector not being deleted because monitor [{}] could not be deleted. Status [{}]", response.getId(), response.getStatus()); | ||
errorStatusSupplier.trySet(response.getStatus()); | ||
return true; | ||
// If detector doesn't have the workflows it means that older version of the plugin is used | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we can use step listener instead of repeating logic There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
if (detector.isWorkflowSupported()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. debug log "deleting workflow before deleting detector |
||
// 1. Delete workflow | ||
workflowService.deleteWorkflow(detector.getWorkflowIds().get(0), | ||
new ActionListener<>() { | ||
@Override | ||
public void onResponse(DeleteWorkflowResponse deleteWorkflowResponse) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. debug log "workflow deleted. deleting monitors before detector deletion" |
||
// 2. Delete related monitors | ||
monitorService.deleteAlertingMonitors(detector.getMonitorIds(), | ||
request.getRefreshPolicy(), | ||
new ActionListener<>() { | ||
@Override | ||
public void onResponse(List<DeleteMonitorResponse> deleteMonitorResponses) { | ||
// 3. Delete detector | ||
deleteDetectorFromConfig(detector.getId(), request.getRefreshPolicy()); | ||
} | ||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
if (counter.compareAndSet(false, true)) { | ||
finishHim(null, e); | ||
} | ||
} | ||
}); | ||
} | ||
return false; | ||
}).count() > 0) { | ||
onFailures(new OpenSearchStatusException("Monitor associated with detected could not be deleted", errorStatusSupplier.get())); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is this removed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is not removed - it's extracted to monitor service. Check it out here |
||
} | ||
deleteDetectorFromConfig(detector.getId(), request.getRefreshPolicy()); | ||
} | ||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
if (counter.compareAndSet(false, true)) { | ||
finishHim(null, e); | ||
} | ||
} | ||
}, monitorIds.size()); | ||
for (String monitorId : monitorIds) { | ||
deleteAlertingMonitor(monitorId, request.getRefreshPolicy(), | ||
deletesListener); | ||
@Override | ||
public void onFailure(Exception e) { | ||
if (counter.compareAndSet(false, true)) { | ||
finishHim(null, e); | ||
} | ||
} | ||
}); | ||
} else { | ||
// 1. Delete monitors | ||
monitorService.deleteAlertingMonitors(detector.getMonitorIds(), | ||
request.getRefreshPolicy(), | ||
new ActionListener<>() { | ||
@Override | ||
public void onResponse(List<DeleteMonitorResponse> deleteMonitorResponses) { | ||
// 2. Delete detector | ||
deleteDetectorFromConfig(detector.getId(), request.getRefreshPolicy()); | ||
} | ||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
if (counter.compareAndSet(false, true)) { | ||
finishHim(null, e); | ||
} | ||
} | ||
}); | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: shouldnt we have single workflow id?
Any motivations for list?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hope we are not exposing workflow id in response serialized to user
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added list because I though it's more generic solution - which will allow us (if we are going consider that scenario in one moment) to add multiple workflows without taking care about the detectors created in the period once we had only one workflow (and creating special case branches in the code with some if-else logic).
Does this makes sense? If not, I will remove it