From 0355f8f2c9ac12464c5fc3be4388f039c8bcf29c Mon Sep 17 00:00:00 2001
From: Megha Goyal <goyamegh@amazon.com>
Date: Thu, 7 Mar 2024 14:32:15 -0800
Subject: [PATCH 1/4] Reinstating more leaks plugged-in for correlations
 workflows

Signed-off-by: Megha Goyal <goyamegh@amazon.com>
---
 .../correlation/VectorEmbeddingsEngine.java        | 10 ++++++++++
 .../transport/TransportCorrelateFindingAction.java | 14 +++++++-------
 2 files changed, 17 insertions(+), 7 deletions(-)

diff --git a/src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java b/src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java
index 86fc70bbd..b367e07bd 100644
--- a/src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java
+++ b/src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java
@@ -195,6 +195,14 @@ public void insertCorrelatedFindings(String detectorType, Finding finding, Strin
     }
 
     public void insertOrphanFindings(String detectorType, Finding finding, float timestampFeature, Map<String, CustomLogType> logTypes) {
+        if (logTypes.get(detectorType) == null ) {
+            log.error("[PERF-DEBUG] insertOrphanFindings detector type {} {}", detectorType, finding.getId());
+            for (String key : logTypes.keySet()) {
+                log.error("[PERF-DEBUG] keys {}", key);
+            }
+            onFailure(new OpenSearchStatusException("insertOrphanFindings null log types for detector type: " + detectorType, RestStatus.INTERNAL_SERVER_ERROR));
+        }
+
         SearchRequest searchRequest = getSearchMetadataIndexRequest(detectorType, finding, logTypes);
         Map<String, Object> tags = logTypes.get(detectorType).getTags();
         String correlationId = tags.get("correlation_id").toString();
@@ -407,6 +415,8 @@ public void insertOrphanFindings(String detectorType, Finding finding, float tim
                                             } catch (Exception ex) {
                                                 onFailure(ex);
                                             }
+                                        } else {
+                                            onFailure(new OpenSearchStatusException("Indexing failed", RestStatus.INTERNAL_SERVER_ERROR));
                                         }
                                     }, this::onFailure));
                                 } catch (Exception ex) {
diff --git a/src/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java b/src/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java
index d5e0eed32..9ab84dc3a 100644
--- a/src/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java
+++ b/src/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java
@@ -172,13 +172,13 @@ protected void doExecute(Task task, ActionRequest request, ActionListener<Subscr
                             correlateFindingAction.onFailures(new OpenSearchStatusException("Failed to create correlation Index", RestStatus.INTERNAL_SERVER_ERROR));
                         }
                     }, correlateFindingAction::onFailures));
-                } catch (IOException ex) {
+                } catch (Exception ex) {
                     correlateFindingAction.onFailures(ex);
                 }
             } else {
                 correlateFindingAction.start();
             }
-        } catch (IOException e) {
+        } catch (Exception e) {
             throw new SecurityAnalyticsException("Unknown exception occurred", RestStatus.INTERNAL_SERVER_ERROR, e);
         }
     }
@@ -245,7 +245,7 @@ void start() {
                             );
                             Detector detector = Detector.docParse(xcp, hit.getId(), hit.getVersion());
                             joinEngine.onSearchDetectorResponse(detector, finding);
-                        } catch (IOException e) {
+                        } catch (Exception e) {
                             log.error("IOException for request {}", searchRequest.toString(), e);
                             onFailures(e);
                         }
@@ -277,7 +277,7 @@ public void initCorrelationIndex(String detectorType, Map<String, List<String>>
                 } else {
                     getTimestampFeature(detectorType, correlatedFindings, null, correlationRules);
                 }
-            } catch (IOException ex) {
+            } catch (Exception ex) {
                 onFailures(ex);
             }
         }
@@ -353,7 +353,8 @@ public void getTimestampFeature(String detectorType, Map<String, List<String>> c
                                     }, this::onFailures));
                                 }, this::onFailures));
                             } else {
-                                log.error(new OpenSearchStatusException("Failed to create correlation metadata Index", RestStatus.INTERNAL_SERVER_ERROR));
+                                Exception e = new OpenSearchStatusException("Failed to create correlation metadata Index", RestStatus.INTERNAL_SERVER_ERROR);
+                                onFailures(e);
                             }
                         }, this::onFailures));
                 } else {
@@ -439,12 +440,11 @@ private IndexRequest getCorrelationMetadataIndexRequest(String id, long newScore
             scoreBuilder.field("root", false);
             scoreBuilder.endObject();
 
-            IndexRequest scoreIndexRequest = new IndexRequest(CorrelationIndices.CORRELATION_METADATA_INDEX)
+            return new IndexRequest(CorrelationIndices.CORRELATION_METADATA_INDEX)
                     .id(id)
                     .source(scoreBuilder)
                     .timeout(indexTimeout)
                     .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
-            return scoreIndexRequest;
         }
         private void insertFindings(float timestampFeature, SearchRequest searchRequest, Map<String, List<String>> correlatedFindings, String detectorType, List<String> correlationRules, Finding orphanFinding) {
             client.search(searchRequest, ActionListener.wrap(response -> {

From 998c0758c2e19e930bba34e8737866d54d58fb32 Mon Sep 17 00:00:00 2001
From: Megha Goyal <goyamegh@amazon.com>
Date: Thu, 7 Mar 2024 19:42:40 -0800
Subject: [PATCH 2/4] Add search timeouts to all correlation searches

Signed-off-by: Megha Goyal <goyamegh@amazon.com>
---
 .../correlation/JoinEngine.java               |  7 ++
 .../correlation/VectorEmbeddingsEngine.java   |  7 +-
 .../TransportCorrelateFindingAction.java      | 75 +++++++++----------
 3 files changed, 49 insertions(+), 40 deletions(-)

diff --git a/src/main/java/org/opensearch/securityanalytics/correlation/JoinEngine.java b/src/main/java/org/opensearch/securityanalytics/correlation/JoinEngine.java
index b33c4d43b..0d373589e 100644
--- a/src/main/java/org/opensearch/securityanalytics/correlation/JoinEngine.java
+++ b/src/main/java/org/opensearch/securityanalytics/correlation/JoinEngine.java
@@ -10,6 +10,7 @@
 import org.apache.lucene.search.join.ScoreMode;
 import org.opensearch.OpenSearchStatusException;
 import org.opensearch.cluster.routing.Preference;
+import org.opensearch.common.unit.TimeValue;
 import org.opensearch.commons.alerting.model.DocLevelQuery;
 import org.opensearch.core.action.ActionListener;
 import org.opensearch.action.search.MultiSearchRequest;
@@ -132,6 +133,7 @@ private void generateAutoCorrelations(Detector detector, Finding finding) throws
                 searchRequest.indices(DetectorMonitorConfig.getAllFindingsIndicesPattern(logTypeName));
                 searchRequest.source(sourceBuilder);
                 searchRequest.preference(Preference.PRIMARY_FIRST.type());
+                searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(10L));
                 mSearchRequest.add(searchRequest);
             }
 
@@ -214,6 +216,7 @@ private void onAutoCorrelations(Detector detector, Finding finding, Map<String,
         searchRequest.indices(CorrelationRule.CORRELATION_RULE_INDEX);
         searchRequest.source(searchSourceBuilder);
         searchRequest.preference(Preference.PRIMARY_FIRST.type());
+        searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(10L));
 
         client.search(searchRequest, ActionListener.wrap(response -> {
             if (response.isTimedOut()) {
@@ -277,6 +280,7 @@ private void getValidDocuments(String detectorType, List<String> indices, List<C
                 searchRequest.indices(indices.toArray(new String[]{}));
                 searchRequest.source(searchSourceBuilder);
                 searchRequest.preference(Preference.PRIMARY_FIRST.type());
+                searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(10L));
 
                 validCorrelationRules.add(rule);
                 validFields.add(query.get().getField());
@@ -377,6 +381,7 @@ private void searchFindingsByTimestamp(String detectorType, Map<String, List<Cor
             searchRequest.indices(DetectorMonitorConfig.getAllFindingsIndicesPattern(categoryToQueries.getKey()));
             searchRequest.source(searchSourceBuilder);
             searchRequest.preference(Preference.PRIMARY_FIRST.type());
+            searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(10L));
             mSearchRequest.add(searchRequest);
             categoryToQueriesPairs.add(Pair.of(categoryToQueries.getKey(), categoryToQueries.getValue()));
         }
@@ -441,6 +446,7 @@ private void searchDocsWithFilterKeys(String detectorType, Map<String, DocSearch
             searchRequest.indices(docSearchCriteria.getValue().indices.toArray(new String[]{}));
             searchRequest.source(searchSourceBuilder);
             searchRequest.preference(Preference.PRIMARY_FIRST.type());
+            searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(10L));
 
             categories.add(docSearchCriteria.getKey());
             mSearchRequest.add(searchRequest);
@@ -502,6 +508,7 @@ private void getCorrelatedFindings(String detectorType, Map<String, List<String>
             searchRequest.indices(DetectorMonitorConfig.getAllFindingsIndicesPattern(relatedDocIds.getKey()));
             searchRequest.source(searchSourceBuilder);
             searchRequest.preference(Preference.PRIMARY_FIRST.type());
+            searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(10L));
 
             categories.add(relatedDocIds.getKey());
             mSearchRequest.add(searchRequest);
diff --git a/src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java b/src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java
index b367e07bd..d3d31c7df 100644
--- a/src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java
+++ b/src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java
@@ -94,6 +94,7 @@ public void insertCorrelatedFindings(String detectorType, Finding finding, Strin
                 request.indices(CorrelationIndices.CORRELATION_HISTORY_INDEX_PATTERN_REGEXP);
                 request.source(searchSourceBuilder);
                 request.preference(Preference.PRIMARY_FIRST.type());
+                request.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(10L));
 
                 mSearchRequest.add(request);
             }
@@ -196,9 +197,9 @@ public void insertCorrelatedFindings(String detectorType, Finding finding, Strin
 
     public void insertOrphanFindings(String detectorType, Finding finding, float timestampFeature, Map<String, CustomLogType> logTypes) {
         if (logTypes.get(detectorType) == null ) {
-            log.error("[PERF-DEBUG] insertOrphanFindings detector type {} {}", detectorType, finding.getId());
+            log.debug("[PERF-DEBUG] insertOrphanFindings detector type {} {}", detectorType, finding.getId());
             for (String key : logTypes.keySet()) {
-                log.error("[PERF-DEBUG] keys {}", key);
+                log.debug("[PERF-DEBUG] keys {}", key);
             }
             onFailure(new OpenSearchStatusException("insertOrphanFindings null log types for detector type: " + detectorType, RestStatus.INTERNAL_SERVER_ERROR));
         }
@@ -331,6 +332,7 @@ public void insertOrphanFindings(String detectorType, Finding finding, float tim
                         request.indices(CorrelationIndices.CORRELATION_HISTORY_INDEX_PATTERN_REGEXP);
                         request.source(searchSourceBuilder);
                         request.preference(Preference.PRIMARY_FIRST.type());
+                        request.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(10L));
 
                         client.search(request, ActionListener.wrap(searchResponse -> {
                             if (searchResponse.isTimedOut()) {
@@ -464,6 +466,7 @@ private SearchRequest getSearchMetadataIndexRequest(String detectorType, Finding
         searchRequest.indices(CorrelationIndices.CORRELATION_METADATA_INDEX);
         searchRequest.source(searchSourceBuilder);
         searchRequest.preference(Preference.PRIMARY_FIRST.type());
+        searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(10L));
         return searchRequest;
     }
 
diff --git a/src/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java b/src/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java
index 9ab84dc3a..aaaca8489 100644
--- a/src/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java
+++ b/src/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java
@@ -228,6 +228,7 @@ void start() {
                 searchRequest.indices(Detector.DETECTORS_INDEX);
                 searchRequest.source(searchSourceBuilder);
                 searchRequest.preference(Preference.PRIMARY_FIRST.type());
+                searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(10L));
 
                 client.search(searchRequest, ActionListener.wrap(response -> {
                     if (response.isTimedOut()) {
@@ -246,7 +247,7 @@ void start() {
                             Detector detector = Detector.docParse(xcp, hit.getId(), hit.getVersion());
                             joinEngine.onSearchDetectorResponse(detector, finding);
                         } catch (Exception e) {
-                            log.error("IOException for request {}", searchRequest.toString(), e);
+                            log.error("Exception for request {}", searchRequest.toString(), e);
                             onFailures(e);
                         }
                     } else {
@@ -365,54 +366,49 @@ public void getTimestampFeature(String detectorType, Map<String, List<String>> c
                         if (response.getHits().getHits().length == 0) {
                             onFailures(new ResourceNotFoundException(
                                     "Failed to find hits in metadata index for finding id {}", request.getFinding().getId()));
-                        }
-
-                        String id = response.getHits().getHits()[0].getId();
-                        Map<String, Object> hitSource = response.getHits().getHits()[0].getSourceAsMap();
-                        long scoreTimestamp = (long) hitSource.get("scoreTimestamp");
+                        } else {
+                            String id = response.getHits().getHits()[0].getId();
+                            Map<String, Object> hitSource = response.getHits().getHits()[0].getSourceAsMap();
+                            long scoreTimestamp = (long) hitSource.get("scoreTimestamp");
 
-                        long newScoreTimestamp = findingTimestamp - CorrelationIndices.FIXED_HISTORICAL_INTERVAL;
-                        if (newScoreTimestamp > scoreTimestamp) {
-                            try {
+                            long newScoreTimestamp = findingTimestamp - CorrelationIndices.FIXED_HISTORICAL_INTERVAL;
+                            if (newScoreTimestamp > scoreTimestamp) {
                                 IndexRequest scoreIndexRequest = getCorrelationMetadataIndexRequest(id, newScoreTimestamp);
 
                                 client.index(scoreIndexRequest, ActionListener.wrap(indexResponse -> {
-                                    SearchRequest searchRequest =  getSearchLogTypeIndexRequest();
+                                    SearchRequest searchRequest = getSearchLogTypeIndexRequest();
 
                                     client.search(searchRequest, ActionListener.wrap(searchResponse -> {
-                                            if (searchResponse.isTimedOut()) {
-                                                onFailures(new OpenSearchStatusException("Search request timed out", RestStatus.REQUEST_TIMEOUT));
-                                            }
+                                        if (searchResponse.isTimedOut()) {
+                                            onFailures(new OpenSearchStatusException("Search request timed out", RestStatus.REQUEST_TIMEOUT));
+                                        }
 
-                                            SearchHit[] hits = searchResponse.getHits().getHits();
-                                            Map<String, CustomLogType> logTypes = new HashMap<>();
-                                            for (SearchHit hit : hits) {
-                                                Map<String, Object> sourceMap = hit.getSourceAsMap();
-                                                logTypes.put(sourceMap.get("name").toString(),
-                                                        new CustomLogType(sourceMap));
-                                            }
+                                        SearchHit[] hits = searchResponse.getHits().getHits();
+                                        Map<String, CustomLogType> logTypes = new HashMap<>();
+                                        for (SearchHit hit : hits) {
+                                            Map<String, Object> sourceMap = hit.getSourceAsMap();
+                                            logTypes.put(sourceMap.get("name").toString(), new CustomLogType(sourceMap));
+                                        }
 
-                                            if (correlatedFindings != null) {
-                                                if (correlatedFindings.isEmpty()) {
-                                                    vectorEmbeddingsEngine.insertOrphanFindings(detectorType, request.getFinding(), Long.valueOf(CorrelationIndices.FIXED_HISTORICAL_INTERVAL / 1000L).floatValue(), logTypes);
-                                                }
-                                                for (Map.Entry<String, List<String>> correlatedFinding : correlatedFindings.entrySet()) {
-                                                    vectorEmbeddingsEngine.insertCorrelatedFindings(detectorType, request.getFinding(), correlatedFinding.getKey(), correlatedFinding.getValue(),
-                                                            Long.valueOf(CorrelationIndices.FIXED_HISTORICAL_INTERVAL / 1000L).floatValue(), correlationRules, logTypes);
-                                                }
-                                            } else {
-                                                vectorEmbeddingsEngine.insertOrphanFindings(detectorType, orphanFinding, Long.valueOf(CorrelationIndices.FIXED_HISTORICAL_INTERVAL / 1000L).floatValue(), logTypes);
+                                        if (correlatedFindings != null) {
+                                            if (correlatedFindings.isEmpty()) {
+                                                vectorEmbeddingsEngine.insertOrphanFindings(detectorType, request.getFinding(), Long.valueOf(CorrelationIndices.FIXED_HISTORICAL_INTERVAL / 1000L).floatValue(), logTypes);
                                             }
-                                        }, this::onFailures));
+                                            for (Map.Entry<String, List<String>> correlatedFinding : correlatedFindings.entrySet()) {
+                                                vectorEmbeddingsEngine.insertCorrelatedFindings(detectorType, request.getFinding(), correlatedFinding.getKey(), correlatedFinding.getValue(),
+                                                        Long.valueOf(CorrelationIndices.FIXED_HISTORICAL_INTERVAL / 1000L).floatValue(), correlationRules, logTypes);
+                                            }
+                                        } else {
+                                            vectorEmbeddingsEngine.insertOrphanFindings(detectorType, orphanFinding, Long.valueOf(CorrelationIndices.FIXED_HISTORICAL_INTERVAL / 1000L).floatValue(), logTypes);
+                                        }
+                                    }, this::onFailures));
                                 }, this::onFailures));
-                            } catch (Exception ex) {
-                                onFailures(ex);
-                            }
-                        } else {
-                            float timestampFeature = Long.valueOf((findingTimestamp - scoreTimestamp) / 1000L).floatValue();
+                            } else {
+                                float timestampFeature = Long.valueOf((findingTimestamp - scoreTimestamp) / 1000L).floatValue();
 
-                            SearchRequest searchRequest = getSearchLogTypeIndexRequest();
-                            insertFindings(timestampFeature, searchRequest, correlatedFindings, detectorType, correlationRules, orphanFinding);
+                                SearchRequest searchRequest = getSearchLogTypeIndexRequest();
+                                insertFindings(timestampFeature, searchRequest, correlatedFindings, detectorType, correlationRules, orphanFinding);
+                            }
                         }
                     }, this::onFailures));
                 }
@@ -431,6 +427,7 @@ private SearchRequest getSearchLogTypeIndexRequest() {
             SearchRequest searchRequest = new SearchRequest();
             searchRequest.indices(LogTypeService.LOG_TYPE_INDEX);
             searchRequest.source(searchSourceBuilder);
+            searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(10L));
             return searchRequest;
         }
 
@@ -446,6 +443,7 @@ private IndexRequest getCorrelationMetadataIndexRequest(String id, long newScore
                     .timeout(indexTimeout)
                     .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
         }
+
         private void insertFindings(float timestampFeature, SearchRequest searchRequest, Map<String, List<String>> correlatedFindings, String detectorType, List<String> correlationRules, Finding orphanFinding) {
             client.search(searchRequest, ActionListener.wrap(response -> {
                 if (response.isTimedOut()) {
@@ -485,6 +483,7 @@ private SearchRequest getSearchMetadataIndexRequest() {
             searchRequest.indices(CorrelationIndices.CORRELATION_METADATA_INDEX);
             searchRequest.source(searchSourceBuilder);
             searchRequest.preference(Preference.PRIMARY_FIRST.type());
+            searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(10L));
 
             return searchRequest;
         }

From 87cf6ca8979286b308ce8166f301ceab34edcd07 Mon Sep 17 00:00:00 2001
From: Megha Goyal <goyamegh@amazon.com>
Date: Fri, 8 Mar 2024 14:00:57 -0800
Subject: [PATCH 3/4] Fix logging and exception messages

Signed-off-by: Megha Goyal <goyamegh@amazon.com>
---
 .../correlation/VectorEmbeddingsEngine.java    | 18 ++++++++++--------
 1 file changed, 10 insertions(+), 8 deletions(-)

diff --git a/src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java b/src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java
index d3d31c7df..88d62006f 100644
--- a/src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java
+++ b/src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java
@@ -32,6 +32,7 @@
 import org.opensearch.securityanalytics.transport.TransportCorrelateFindingAction;
 import org.opensearch.securityanalytics.util.CorrelationIndices;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -197,10 +198,8 @@ public void insertCorrelatedFindings(String detectorType, Finding finding, Strin
 
     public void insertOrphanFindings(String detectorType, Finding finding, float timestampFeature, Map<String, CustomLogType> logTypes) {
         if (logTypes.get(detectorType) == null ) {
-            log.debug("[PERF-DEBUG] insertOrphanFindings detector type {} {}", detectorType, finding.getId());
-            for (String key : logTypes.keySet()) {
-                log.debug("[PERF-DEBUG] keys {}", key);
-            }
+            log.debug("Missing detector type {} in the log types index for finding id {}. Keys in the index: {}",
+                    detectorType, finding.getId(), Arrays.toString(logTypes.keySet().toArray()));
             onFailure(new OpenSearchStatusException("insertOrphanFindings null log types for detector type: " + detectorType, RestStatus.INTERNAL_SERVER_ERROR));
         }
 
@@ -260,7 +259,8 @@ public void insertOrphanFindings(String detectorType, Finding finding, float tim
                                 onFailure(ex);
                             }
                         } else {
-                            onFailure(new OpenSearchStatusException(indexResponse.toString(), RestStatus.INTERNAL_SERVER_ERROR));
+                            onFailure(new OpenSearchStatusException("Indexing failed with response {} ",
+                                    indexResponse.status(), indexResponse.toString()));
                         }
                     }, this::onFailure));
                 } else {
@@ -306,7 +306,8 @@ public void insertOrphanFindings(String detectorType, Finding finding, float tim
                                     onFailure(ex);
                                 }
                             } else {
-                                onFailure(new OpenSearchStatusException(indexResponse.toString(), RestStatus.INTERNAL_SERVER_ERROR));
+                                onFailure(new OpenSearchStatusException("Indexing failed with response {} ",
+                                        indexResponse.status(), indexResponse.toString()));
                             }
                         }, this::onFailure));
                     } else {
@@ -418,7 +419,8 @@ public void insertOrphanFindings(String detectorType, Finding finding, float tim
                                                 onFailure(ex);
                                             }
                                         } else {
-                                            onFailure(new OpenSearchStatusException("Indexing failed", RestStatus.INTERNAL_SERVER_ERROR));
+                                            onFailure(new OpenSearchStatusException("Indexing failed with response {} ",
+                                                    indexResponse.status(), indexResponse.toString()));
                                         }
                                     }, this::onFailure));
                                 } catch (Exception ex) {
@@ -444,7 +446,7 @@ private void indexCorrelatedFindings(XContentBuilder builder) {
             if (response.status().equals(RestStatus.CREATED)) {
                 correlateFindingAction.onOperation();
             } else {
-                onFailure(new OpenSearchStatusException(response.toString(), RestStatus.INTERNAL_SERVER_ERROR));
+                onFailure(new OpenSearchStatusException("Indexing failed with response {} ", response.status(), response.toString()));
             }
         }, this::onFailure));
     }

From 7788f847596968d1b9ddc5fabdf7d7dc85125722 Mon Sep 17 00:00:00 2001
From: Megha Goyal <goyamegh@amazon.com>
Date: Fri, 8 Mar 2024 14:06:54 -0800
Subject: [PATCH 4/4] Change search timeout to 30 seconds

Signed-off-by: Megha Goyal <goyamegh@amazon.com>
---
 .../securityanalytics/correlation/JoinEngine.java    | 12 ++++++------
 .../correlation/VectorEmbeddingsEngine.java          |  6 +++---
 .../transport/TransportCorrelateFindingAction.java   |  6 +++---
 3 files changed, 12 insertions(+), 12 deletions(-)

diff --git a/src/main/java/org/opensearch/securityanalytics/correlation/JoinEngine.java b/src/main/java/org/opensearch/securityanalytics/correlation/JoinEngine.java
index 0d373589e..3b4314e12 100644
--- a/src/main/java/org/opensearch/securityanalytics/correlation/JoinEngine.java
+++ b/src/main/java/org/opensearch/securityanalytics/correlation/JoinEngine.java
@@ -133,7 +133,7 @@ private void generateAutoCorrelations(Detector detector, Finding finding) throws
                 searchRequest.indices(DetectorMonitorConfig.getAllFindingsIndicesPattern(logTypeName));
                 searchRequest.source(sourceBuilder);
                 searchRequest.preference(Preference.PRIMARY_FIRST.type());
-                searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(10L));
+                searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));
                 mSearchRequest.add(searchRequest);
             }
 
@@ -216,7 +216,7 @@ private void onAutoCorrelations(Detector detector, Finding finding, Map<String,
         searchRequest.indices(CorrelationRule.CORRELATION_RULE_INDEX);
         searchRequest.source(searchSourceBuilder);
         searchRequest.preference(Preference.PRIMARY_FIRST.type());
-        searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(10L));
+        searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));
 
         client.search(searchRequest, ActionListener.wrap(response -> {
             if (response.isTimedOut()) {
@@ -280,7 +280,7 @@ private void getValidDocuments(String detectorType, List<String> indices, List<C
                 searchRequest.indices(indices.toArray(new String[]{}));
                 searchRequest.source(searchSourceBuilder);
                 searchRequest.preference(Preference.PRIMARY_FIRST.type());
-                searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(10L));
+                searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));
 
                 validCorrelationRules.add(rule);
                 validFields.add(query.get().getField());
@@ -381,7 +381,7 @@ private void searchFindingsByTimestamp(String detectorType, Map<String, List<Cor
             searchRequest.indices(DetectorMonitorConfig.getAllFindingsIndicesPattern(categoryToQueries.getKey()));
             searchRequest.source(searchSourceBuilder);
             searchRequest.preference(Preference.PRIMARY_FIRST.type());
-            searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(10L));
+            searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));
             mSearchRequest.add(searchRequest);
             categoryToQueriesPairs.add(Pair.of(categoryToQueries.getKey(), categoryToQueries.getValue()));
         }
@@ -446,7 +446,7 @@ private void searchDocsWithFilterKeys(String detectorType, Map<String, DocSearch
             searchRequest.indices(docSearchCriteria.getValue().indices.toArray(new String[]{}));
             searchRequest.source(searchSourceBuilder);
             searchRequest.preference(Preference.PRIMARY_FIRST.type());
-            searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(10L));
+            searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));
 
             categories.add(docSearchCriteria.getKey());
             mSearchRequest.add(searchRequest);
@@ -508,7 +508,7 @@ private void getCorrelatedFindings(String detectorType, Map<String, List<String>
             searchRequest.indices(DetectorMonitorConfig.getAllFindingsIndicesPattern(relatedDocIds.getKey()));
             searchRequest.source(searchSourceBuilder);
             searchRequest.preference(Preference.PRIMARY_FIRST.type());
-            searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(10L));
+            searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));
 
             categories.add(relatedDocIds.getKey());
             mSearchRequest.add(searchRequest);
diff --git a/src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java b/src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java
index 88d62006f..78f7dc765 100644
--- a/src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java
+++ b/src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java
@@ -95,7 +95,7 @@ public void insertCorrelatedFindings(String detectorType, Finding finding, Strin
                 request.indices(CorrelationIndices.CORRELATION_HISTORY_INDEX_PATTERN_REGEXP);
                 request.source(searchSourceBuilder);
                 request.preference(Preference.PRIMARY_FIRST.type());
-                request.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(10L));
+                request.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));
 
                 mSearchRequest.add(request);
             }
@@ -333,7 +333,7 @@ public void insertOrphanFindings(String detectorType, Finding finding, float tim
                         request.indices(CorrelationIndices.CORRELATION_HISTORY_INDEX_PATTERN_REGEXP);
                         request.source(searchSourceBuilder);
                         request.preference(Preference.PRIMARY_FIRST.type());
-                        request.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(10L));
+                        request.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));
 
                         client.search(request, ActionListener.wrap(searchResponse -> {
                             if (searchResponse.isTimedOut()) {
@@ -468,7 +468,7 @@ private SearchRequest getSearchMetadataIndexRequest(String detectorType, Finding
         searchRequest.indices(CorrelationIndices.CORRELATION_METADATA_INDEX);
         searchRequest.source(searchSourceBuilder);
         searchRequest.preference(Preference.PRIMARY_FIRST.type());
-        searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(10L));
+        searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));
         return searchRequest;
     }
 
diff --git a/src/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java b/src/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java
index aaaca8489..910794556 100644
--- a/src/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java
+++ b/src/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java
@@ -228,7 +228,7 @@ void start() {
                 searchRequest.indices(Detector.DETECTORS_INDEX);
                 searchRequest.source(searchSourceBuilder);
                 searchRequest.preference(Preference.PRIMARY_FIRST.type());
-                searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(10L));
+                searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));
 
                 client.search(searchRequest, ActionListener.wrap(response -> {
                     if (response.isTimedOut()) {
@@ -427,7 +427,7 @@ private SearchRequest getSearchLogTypeIndexRequest() {
             SearchRequest searchRequest = new SearchRequest();
             searchRequest.indices(LogTypeService.LOG_TYPE_INDEX);
             searchRequest.source(searchSourceBuilder);
-            searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(10L));
+            searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));
             return searchRequest;
         }
 
@@ -483,7 +483,7 @@ private SearchRequest getSearchMetadataIndexRequest() {
             searchRequest.indices(CorrelationIndices.CORRELATION_METADATA_INDEX);
             searchRequest.source(searchSourceBuilder);
             searchRequest.preference(Preference.PRIMARY_FIRST.type());
-            searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(10L));
+            searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));
 
             return searchRequest;
         }