Skip to content

Commit

Permalink
[Backport 2.5] Manually backported PRs 384, and 396. (#460)
Browse files Browse the repository at this point in the history
* handle index not exists for detector search and delete (#396) (#397)

Signed-off-by: Surya Sashank Nistala <[email protected]>
(cherry picked from commit e50c70a)

Co-authored-by: Surya Sashank Nistala <[email protected]>

* Handle monitor or monitor index not exists during detector deletion (#384) (#398)

Signed-off-by: Surya Sashank Nistala <[email protected]>
(cherry picked from commit 6271399)

Co-authored-by: Surya Sashank Nistala <[email protected]>
Signed-off-by: AWSHurneyt <[email protected]>

---------

Signed-off-by: AWSHurneyt <[email protected]>
Co-authored-by: opensearch-trigger-bot[bot] <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com>
Co-authored-by: Surya Sashank Nistala <[email protected]>
  • Loading branch information
3 people authored Jun 9, 2023
1 parent 36892bf commit 570b205
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.GroupedActionListener;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.inject.Inject;
Expand All @@ -36,13 +34,12 @@
import org.opensearch.securityanalytics.action.DeleteDetectorRequest;
import org.opensearch.securityanalytics.action.DeleteDetectorResponse;
import org.opensearch.securityanalytics.model.Detector;
import org.opensearch.securityanalytics.util.DetectorIndices;
import org.opensearch.securityanalytics.util.RuleTopicIndices;
import org.opensearch.securityanalytics.util.SecurityAnalyticsException;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
Expand All @@ -63,18 +60,21 @@ public class TransportDeleteDetectorAction extends HandledTransportAction<Delete

private final ThreadPool threadPool;

private final DetectorIndices detectorIndices;

@Inject
public TransportDeleteDetectorAction(TransportService transportService, Client client, ActionFilters actionFilters, NamedXContentRegistry xContentRegistry, RuleTopicIndices ruleTopicIndices) {
public TransportDeleteDetectorAction(TransportService transportService, Client client, ActionFilters actionFilters, NamedXContentRegistry xContentRegistry, RuleTopicIndices ruleTopicIndices, DetectorIndices detectorIndices) {
super(DeleteDetectorAction.NAME, transportService, actionFilters, DeleteDetectorRequest::new);
this.client = client;
this.ruleTopicIndices = ruleTopicIndices;
this.xContentRegistry = xContentRegistry;
this.threadPool = client.threadPool();
this.detectorIndices = detectorIndices;
}

@Override
protected void doExecute(Task task, DeleteDetectorRequest request, ActionListener<DeleteDetectorResponse> listener) {
AsyncDeleteDetectorAction asyncAction = new AsyncDeleteDetectorAction(task, request, listener);
AsyncDeleteDetectorAction asyncAction = new AsyncDeleteDetectorAction(task, request, listener, detectorIndices);
asyncAction.start();
}

Expand All @@ -95,17 +95,31 @@ class AsyncDeleteDetectorAction {
private final ActionListener<DeleteDetectorResponse> listener;
private final AtomicReference<Object> response;
private final AtomicBoolean counter = new AtomicBoolean();
private final DetectorIndices detectorIndices;
private final Task task;

AsyncDeleteDetectorAction(Task task, DeleteDetectorRequest request, ActionListener<DeleteDetectorResponse> listener) {
AsyncDeleteDetectorAction(
Task task,
DeleteDetectorRequest request,
ActionListener<DeleteDetectorResponse> listener,
DetectorIndices detectorIndices) {
this.task = task;
this.request = request;
this.listener = listener;

this.response = new AtomicReference<>();
this.detectorIndices = detectorIndices;
}

void start() {
if (!detectorIndices.detectorIndexExists()) {
onFailures(new OpenSearchStatusException(
String.format(Locale.getDefault(),
"Detector with %s is not found",
request.getDetectorId()),
RestStatus.NOT_FOUND));
return;

}
TransportDeleteDetectorAction.this.threadPool.getThreadContext().stashContext();
String detectorId = request.getDetectorId();
GetRequest getRequest = new GetRequest(Detector.DETECTORS_INDEX, detectorId);
Expand Down Expand Up @@ -137,7 +151,6 @@ public void onFailure(Exception t) {

private void onGetResponse(Detector detector) {
List<String> monitorIds = detector.getMonitorIds();
String ruleIndex = detector.getRuleIndex();
ActionListener<DeleteMonitorResponse> deletesListener = new GroupedActionListener<>(new ActionListener<>() {
@Override
public void onResponse(Collection<DeleteMonitorResponse> responses) {
Expand All @@ -157,8 +170,13 @@ public void onResponse(Collection<DeleteMonitorResponse> responses) {

@Override
public void onFailure(Exception e) {
if (counter.compareAndSet(false, true)) {
finishHim(null, e);
if(isOnlyMonitorOrIndexMissingExceptionThrownByGroupedActionListener(e, detector.getId())) {
deleteDetectorFromConfig(detector.getId(), request.getRefreshPolicy());
} else {
log.error(String.format(Locale.ROOT, "Failed to delete detector %s", detector.getId()), e);
if (counter.compareAndSet(false, true)) {
finishHim(null, e);
}
}
}
}, monitorIds.size());
Expand Down Expand Up @@ -191,6 +209,7 @@ private void onOperation(DeleteResponse response) {
}

private void onFailures(Exception t) {
log.error(String.format(Locale.ROOT, "Failed to delete detector"));
if (counter.compareAndSet(false, true)) {
finishHim(null, t);
}
Expand All @@ -199,6 +218,7 @@ private void onFailures(Exception t) {
private void finishHim(String detectorId, Exception t) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(ActionRunnable.supply(listener, () -> {
if (t != null) {
log.error(String.format(Locale.ROOT, "Failed to delete detector %s",detectorId), t);
if (t instanceof OpenSearchStatusException) {
throw t;
}
Expand All @@ -208,5 +228,29 @@ private void finishHim(String detectorId, Exception t) {
}
}));
}

private boolean isOnlyMonitorOrIndexMissingExceptionThrownByGroupedActionListener(
Exception ex,
String detectorId
) {
// grouped action listener listens on mutliple listeners but throws only one exception. If multiple
// listeners fail the other exceptions are added as suppressed exceptions to the first failure.
int len = ex.getSuppressed().length;
for (int i = 0; i <= len; i++) {
Throwable e = i == len ? ex : ex.getSuppressed()[i];
if (e.getMessage().matches("(.*)Monitor(.*) is not found(.*)")
|| e.getMessage().contains(
"Configured indices are not found: [.opendistro-alerting-config]")
) {
log.error(
String.format(Locale.ROOT, "Monitor or jobs index already deleted." +
" Proceeding with detector %s deletion", detectorId),
e);
} else {
return false;
}
}
return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,13 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import org.apache.lucene.search.TotalHits;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.search.SearchResponse;

import org.opensearch.action.search.SearchResponseSections;
import org.opensearch.action.search.ShardSearchFailure;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.commons.authuser.User;
Expand All @@ -20,6 +24,12 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.rest.RestStatus;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;
import org.opensearch.search.aggregations.InternalAggregations;
import org.opensearch.search.internal.InternalSearchResponse;
import org.opensearch.search.profile.SearchProfileShardResults;
import org.opensearch.search.suggest.Suggest;
import org.opensearch.securityanalytics.action.SearchDetectorAction;
import org.opensearch.securityanalytics.action.SearchDetectorRequest;
import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings;
Expand All @@ -30,7 +40,11 @@
import org.opensearch.transport.TransportService;


import java.util.Collections;
import java.util.Locale;

import static org.opensearch.rest.RestStatus.OK;
import static org.opensearch.securityanalytics.util.DetectorUtils.getEmptySearchResponse;

public class TransportSearchDetectorAction extends HandledTransportAction<SearchDetectorRequest, SearchResponse> implements SecureTransportAction {

Expand Down Expand Up @@ -78,7 +92,10 @@ protected void doExecute(Task task, SearchDetectorRequest searchDetectorRequest,
}

this.threadPool.getThreadContext().stashContext();

if (!detectorIndices.detectorIndexExists()) {
actionListener.onResponse(getEmptySearchResponse());
return;
}
client.search(searchDetectorRequest.searchRequest(), new ActionListener<>() {
@Override
public void onResponse(SearchResponse response) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,23 @@
*/
package org.opensearch.securityanalytics.util;

import org.apache.lucene.search.TotalHits;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.action.search.ShardSearchFailure;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.common.xcontent.ToXContent;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.XContentParser;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;
import org.opensearch.search.aggregations.InternalAggregations;
import org.opensearch.search.internal.InternalSearchResponse;
import org.opensearch.search.profile.SearchProfileShardResults;
import org.opensearch.search.suggest.Suggest;
import org.opensearch.securityanalytics.model.Detector;

import java.io.IOException;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;

Expand All @@ -25,6 +29,16 @@ public class DetectorUtils {
public static final String DETECTOR_TYPE_PATH = "detector.detector_type";
public static final String DETECTOR_ID_FIELD = "detector_id";

public static SearchResponse getEmptySearchResponse() {
return new SearchResponse(new InternalSearchResponse(
new SearchHits(new SearchHit[0], new TotalHits(0L, TotalHits.Relation.EQUAL_TO), 0.0f),
InternalAggregations.from(Collections.emptyList()),
new Suggest(Collections.emptyList()),
new SearchProfileShardResults(Collections.emptyMap()), false, false, 0),
"", 0, 0, 0, 0,
ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY);
}

public static List<Detector> getDetectors(SearchResponse response, NamedXContentRegistry xContentRegistry) throws IOException {
List<Detector> detectors = new LinkedList<>();
for (SearchHit hit : response.getHits()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,18 @@ protected Response executeAlertingMonitor(RestClient client, String monitorId, M
return makeRequest(client, "POST", String.format(Locale.getDefault(), "/_plugins/_alerting/monitors/%s/_execute", monitorId), params, null);
}

protected Response deleteAlertingMonitorIndex() throws IOException {
return makeRequest(client(), "DELETE", String.format(Locale.getDefault(), "/.opendistro-alerting-config"), new HashMap<>(), null);
}

protected Response deleteAlertingMonitor(String monitorId) throws IOException {
return deleteAlertingMonitor(client(), monitorId);
}

protected Response deleteAlertingMonitor(RestClient client, String monitorId) throws IOException {
return makeRequest(client, "DELETE", String.format(Locale.getDefault(), "/_plugins/_alerting/monitors/%s", monitorId), new HashMap<>(), null);
}

protected List<SearchHit> executeSearch(String index, String request) throws IOException {
return executeSearch(index, request, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,48 @@

public class DetectorRestApiIT extends SecurityAnalyticsRestTestCase {

@SuppressWarnings("unchecked")
public void testDeletingADetector_MonitorNotExists() throws IOException {
String index = createTestIndex(randomIndex(), windowsIndexMapping());

// Execute CreateMappingsAction to add alias mapping for index
Request createMappingRequest = new Request("POST", SecurityAnalyticsPlugin.MAPPER_BASE_URI);
// both req params and req body are supported
createMappingRequest.setJsonEntity(
"{ \"index_name\":\"" + index + "\"," +
" \"rule_topic\":\"" + randomDetectorType() + "\", " +
" \"partial\":true" +
"}"
);

Response response = client().performRequest(createMappingRequest);
assertEquals(HttpStatus.SC_OK, response.getStatusLine().getStatusCode());
// Create detector #1 of type test_windows
Detector detector1 = randomDetectorWithTriggers(getRandomPrePackagedRules(), List.of(new DetectorTrigger(null, "test-trigger", "1", List.of(randomDetectorType()), List.of(), List.of(), List.of(), List.of())));
String detectorId1 = createDetector(detector1);

String request = "{\n" +
" \"query\" : {\n" +
" \"match\":{\n" +
" \"_id\": \"" + detectorId1 + "\"\n" +
" }\n" +
" }\n" +
"}";
List<SearchHit> hits = executeSearch(Detector.DETECTORS_INDEX, request);
SearchHit hit = hits.get(0);

String monitorId = ((List<String>) ((Map<String, Object>) hit.getSourceAsMap().get("detector")).get("monitor_id")).get(0);

Response deleteMonitorResponse = deleteAlertingMonitor(monitorId);
assertEquals(200, deleteMonitorResponse.getStatusLine().getStatusCode());
entityAsMap(deleteMonitorResponse);

Response deleteResponse = makeRequest(client(), "DELETE", SecurityAnalyticsPlugin.DETECTOR_BASE_URI + "/" + detectorId1, Collections.emptyMap(), null);
Assert.assertEquals("Delete detector failed", RestStatus.OK, restStatus(deleteResponse));
hits = executeSearch(Detector.DETECTORS_INDEX, request);
Assert.assertEquals(0, hits.size());
}

@SuppressWarnings("unchecked")
public void testCreatingADetector() throws IOException {
String index = createTestIndex(randomIndex(), windowsIndexMapping());
Expand Down Expand Up @@ -100,6 +142,29 @@ public void testCreatingADetector() throws IOException {
Assert.assertEquals(5, noOfSigmaRuleMatches);
}

@SuppressWarnings("unchecked")
public void test_searchDetectors_detectorsIndexNotExists() throws IOException {
try {
makeRequest(client(), "DELETE", SecurityAnalyticsPlugin.DETECTOR_BASE_URI + "/" + "d1", Collections.emptyMap(), null);
fail("delete detector call should have failed");
} catch (IOException e) {
assertTrue(e.getMessage().contains("not found"));
}
String request = "{\n" +
" \"query\" : {\n" +
" \"match_all\":{\n" +
" }\n" +
" }\n" +
"}";
HttpEntity requestEntity = new StringEntity(request, ContentType.APPLICATION_JSON);
Response searchResponse = makeRequest(client(), "POST", SecurityAnalyticsPlugin.DETECTOR_BASE_URI + "/" + "_search", Collections.emptyMap(), requestEntity);
Map<String, Object> searchResponseBody = asMap(searchResponse);
Assert.assertNotNull("response is not null", searchResponseBody);
Map<String, Object> searchResponseHits = (Map) searchResponseBody.get("hits");
Map<String, Object> searchResponseTotal = (Map) searchResponseHits.get("total");
Assert.assertEquals(0, searchResponseTotal.get("value"));
}

public void testCreatingADetectorWithIndexNotExists() throws IOException {
Detector detector = randomDetectorWithTriggers(getRandomPrePackagedRules(), List.of(new DetectorTrigger(null, "test-trigger", "1", List.of(randomDetectorType()), List.of(), List.of(), List.of(), List.of())));

Expand Down

0 comments on commit 570b205

Please sign in to comment.