-
Notifications
You must be signed in to change notification settings - Fork 75
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add search request timeouts for correlations workflows #893
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -94,6 +94,7 @@ | |
request.indices(CorrelationIndices.CORRELATION_HISTORY_INDEX_PATTERN_REGEXP); | ||
request.source(searchSourceBuilder); | ||
request.preference(Preference.PRIMARY_FIRST.type()); | ||
request.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(10L)); | ||
Check warning on line 97 in src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java#L97
|
||
|
||
mSearchRequest.add(request); | ||
} | ||
|
@@ -195,6 +196,14 @@ | |
} | ||
|
||
public void insertOrphanFindings(String detectorType, Finding finding, float timestampFeature, Map<String, CustomLogType> logTypes) { | ||
if (logTypes.get(detectorType) == null ) { | ||
log.debug("[PERF-DEBUG] insertOrphanFindings detector type {} {}", detectorType, finding.getId()); | ||
Check warning on line 200 in src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java#L200
|
||
for (String key : logTypes.keySet()) { | ||
log.debug("[PERF-DEBUG] keys {}", key); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Plz remove perf_debug prefix in all logs There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed logging. |
||
} | ||
onFailure(new OpenSearchStatusException("insertOrphanFindings null log types for detector type: " + detectorType, RestStatus.INTERNAL_SERVER_ERROR)); | ||
Check warning on line 204 in src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java#L202-L204
|
||
} | ||
|
||
SearchRequest searchRequest = getSearchMetadataIndexRequest(detectorType, finding, logTypes); | ||
Map<String, Object> tags = logTypes.get(detectorType).getTags(); | ||
String correlationId = tags.get("correlation_id").toString(); | ||
|
@@ -323,6 +332,7 @@ | |
request.indices(CorrelationIndices.CORRELATION_HISTORY_INDEX_PATTERN_REGEXP); | ||
request.source(searchSourceBuilder); | ||
request.preference(Preference.PRIMARY_FIRST.type()); | ||
request.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(10L)); | ||
Check warning on line 335 in src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java#L335
|
||
|
||
client.search(request, ActionListener.wrap(searchResponse -> { | ||
if (searchResponse.isTimedOut()) { | ||
|
@@ -407,6 +417,8 @@ | |
} catch (Exception ex) { | ||
onFailure(ex); | ||
} | ||
} else { | ||
onFailure(new OpenSearchStatusException("Indexing failed", RestStatus.INTERNAL_SERVER_ERROR)); | ||
Check warning on line 421 in src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java#L421
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can log message convey that search timed out hence indexing failed There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed the messaging to include the response itself as timeout can be one of the responses. |
||
} | ||
}, this::onFailure)); | ||
} catch (Exception ex) { | ||
|
@@ -454,6 +466,7 @@ | |
searchRequest.indices(CorrelationIndices.CORRELATION_METADATA_INDEX); | ||
searchRequest.source(searchSourceBuilder); | ||
searchRequest.preference(Preference.PRIMARY_FIRST.type()); | ||
searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(10L)); | ||
Check warning on line 469 in src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java#L469
|
||
return searchRequest; | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -172,13 +172,13 @@ | |
correlateFindingAction.onFailures(new OpenSearchStatusException("Failed to create correlation Index", RestStatus.INTERNAL_SERVER_ERROR)); | ||
} | ||
}, correlateFindingAction::onFailures)); | ||
} catch (IOException ex) { | ||
} catch (Exception ex) { | ||
Check warning on line 175 in src/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java#L175
|
||
correlateFindingAction.onFailures(ex); | ||
} | ||
} else { | ||
correlateFindingAction.start(); | ||
} | ||
} catch (IOException e) { | ||
} catch (Exception e) { | ||
Check warning on line 181 in src/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java#L181
|
||
throw new SecurityAnalyticsException("Unknown exception occurred", RestStatus.INTERNAL_SERVER_ERROR, e); | ||
} | ||
} | ||
|
@@ -228,6 +228,7 @@ | |
searchRequest.indices(Detector.DETECTORS_INDEX); | ||
searchRequest.source(searchSourceBuilder); | ||
searchRequest.preference(Preference.PRIMARY_FIRST.type()); | ||
searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(10L)); | ||
Check warning on line 231 in src/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java#L231
|
||
|
||
client.search(searchRequest, ActionListener.wrap(response -> { | ||
if (response.isTimedOut()) { | ||
|
@@ -245,8 +246,8 @@ | |
); | ||
Detector detector = Detector.docParse(xcp, hit.getId(), hit.getVersion()); | ||
joinEngine.onSearchDetectorResponse(detector, finding); | ||
} catch (IOException e) { | ||
log.error("IOException for request {}", searchRequest.toString(), e); | ||
} catch (Exception e) { | ||
log.error("Exception for request {}", searchRequest.toString(), e); | ||
Check warning on line 250 in src/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java#L249-L250
|
||
onFailures(e); | ||
} | ||
} else { | ||
|
@@ -277,7 +278,7 @@ | |
} else { | ||
getTimestampFeature(detectorType, correlatedFindings, null, correlationRules); | ||
} | ||
} catch (IOException ex) { | ||
} catch (Exception ex) { | ||
Check warning on line 281 in src/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java#L281
|
||
onFailures(ex); | ||
} | ||
} | ||
|
@@ -353,7 +354,8 @@ | |
}, this::onFailures)); | ||
}, this::onFailures)); | ||
} else { | ||
log.error(new OpenSearchStatusException("Failed to create correlation metadata Index", RestStatus.INTERNAL_SERVER_ERROR)); | ||
Exception e = new OpenSearchStatusException("Failed to create correlation metadata Index", RestStatus.INTERNAL_SERVER_ERROR); | ||
onFailures(e); | ||
Check warning on line 358 in src/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java#L357-L358
|
||
} | ||
}, this::onFailures)); | ||
} else { | ||
|
@@ -364,54 +366,49 @@ | |
if (response.getHits().getHits().length == 0) { | ||
onFailures(new ResourceNotFoundException( | ||
"Failed to find hits in metadata index for finding id {}", request.getFinding().getId())); | ||
} | ||
|
||
String id = response.getHits().getHits()[0].getId(); | ||
Map<String, Object> hitSource = response.getHits().getHits()[0].getSourceAsMap(); | ||
long scoreTimestamp = (long) hitSource.get("scoreTimestamp"); | ||
} else { | ||
String id = response.getHits().getHits()[0].getId(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Empty check?(i see u didnt change the code but better to add) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added this check intentionally as I was observing that the flow was continuing even when we throw the exception in the if block above. So, putting this in the else block will avoid the ArrayOutOfBoundsException then. |
||
Map<String, Object> hitSource = response.getHits().getHits()[0].getSourceAsMap(); | ||
long scoreTimestamp = (long) hitSource.get("scoreTimestamp"); | ||
Check warning on line 372 in src/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java#L370-L372
|
||
|
||
long newScoreTimestamp = findingTimestamp - CorrelationIndices.FIXED_HISTORICAL_INTERVAL; | ||
if (newScoreTimestamp > scoreTimestamp) { | ||
try { | ||
long newScoreTimestamp = findingTimestamp - CorrelationIndices.FIXED_HISTORICAL_INTERVAL; | ||
Check warning on line 374 in src/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java#L374
|
||
if (newScoreTimestamp > scoreTimestamp) { | ||
IndexRequest scoreIndexRequest = getCorrelationMetadataIndexRequest(id, newScoreTimestamp); | ||
|
||
client.index(scoreIndexRequest, ActionListener.wrap(indexResponse -> { | ||
SearchRequest searchRequest = getSearchLogTypeIndexRequest(); | ||
SearchRequest searchRequest = getSearchLogTypeIndexRequest(); | ||
Check warning on line 379 in src/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java#L379
|
||
|
||
client.search(searchRequest, ActionListener.wrap(searchResponse -> { | ||
if (searchResponse.isTimedOut()) { | ||
onFailures(new OpenSearchStatusException("Search request timed out", RestStatus.REQUEST_TIMEOUT)); | ||
} | ||
if (searchResponse.isTimedOut()) { | ||
onFailures(new OpenSearchStatusException("Search request timed out", RestStatus.REQUEST_TIMEOUT)); | ||
Check warning on line 383 in src/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java#L383
|
||
} | ||
|
||
SearchHit[] hits = searchResponse.getHits().getHits(); | ||
Map<String, CustomLogType> logTypes = new HashMap<>(); | ||
for (SearchHit hit : hits) { | ||
Map<String, Object> sourceMap = hit.getSourceAsMap(); | ||
logTypes.put(sourceMap.get("name").toString(), | ||
new CustomLogType(sourceMap)); | ||
} | ||
SearchHit[] hits = searchResponse.getHits().getHits(); | ||
Map<String, CustomLogType> logTypes = new HashMap<>(); | ||
Check warning on line 387 in src/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java#L386-L387
|
||
for (SearchHit hit : hits) { | ||
Map<String, Object> sourceMap = hit.getSourceAsMap(); | ||
logTypes.put(sourceMap.get("name").toString(), new CustomLogType(sourceMap)); | ||
Check warning on line 390 in src/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java#L389-L390
|
||
} | ||
|
||
if (correlatedFindings != null) { | ||
if (correlatedFindings.isEmpty()) { | ||
vectorEmbeddingsEngine.insertOrphanFindings(detectorType, request.getFinding(), Long.valueOf(CorrelationIndices.FIXED_HISTORICAL_INTERVAL / 1000L).floatValue(), logTypes); | ||
} | ||
for (Map.Entry<String, List<String>> correlatedFinding : correlatedFindings.entrySet()) { | ||
vectorEmbeddingsEngine.insertCorrelatedFindings(detectorType, request.getFinding(), correlatedFinding.getKey(), correlatedFinding.getValue(), | ||
Long.valueOf(CorrelationIndices.FIXED_HISTORICAL_INTERVAL / 1000L).floatValue(), correlationRules, logTypes); | ||
} | ||
} else { | ||
vectorEmbeddingsEngine.insertOrphanFindings(detectorType, orphanFinding, Long.valueOf(CorrelationIndices.FIXED_HISTORICAL_INTERVAL / 1000L).floatValue(), logTypes); | ||
if (correlatedFindings != null) { | ||
if (correlatedFindings.isEmpty()) { | ||
vectorEmbeddingsEngine.insertOrphanFindings(detectorType, request.getFinding(), Long.valueOf(CorrelationIndices.FIXED_HISTORICAL_INTERVAL / 1000L).floatValue(), logTypes); | ||
Check warning on line 395 in src/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java#L395
|
||
} | ||
}, this::onFailures)); | ||
for (Map.Entry<String, List<String>> correlatedFinding : correlatedFindings.entrySet()) { | ||
vectorEmbeddingsEngine.insertCorrelatedFindings(detectorType, request.getFinding(), correlatedFinding.getKey(), correlatedFinding.getValue(), | ||
Long.valueOf(CorrelationIndices.FIXED_HISTORICAL_INTERVAL / 1000L).floatValue(), correlationRules, logTypes); | ||
} | ||
Check warning on line 400 in src/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java#L398-L400
|
||
} else { | ||
vectorEmbeddingsEngine.insertOrphanFindings(detectorType, orphanFinding, Long.valueOf(CorrelationIndices.FIXED_HISTORICAL_INTERVAL / 1000L).floatValue(), logTypes); | ||
Check warning on line 402 in src/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java#L402
|
||
} | ||
}, this::onFailures)); | ||
Check warning on line 404 in src/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java#L404
|
||
}, this::onFailures)); | ||
} catch (Exception ex) { | ||
onFailures(ex); | ||
} | ||
} else { | ||
float timestampFeature = Long.valueOf((findingTimestamp - scoreTimestamp) / 1000L).floatValue(); | ||
} else { | ||
float timestampFeature = Long.valueOf((findingTimestamp - scoreTimestamp) / 1000L).floatValue(); | ||
Check warning on line 407 in src/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java#L406-L407
|
||
|
||
SearchRequest searchRequest = getSearchLogTypeIndexRequest(); | ||
insertFindings(timestampFeature, searchRequest, correlatedFindings, detectorType, correlationRules, orphanFinding); | ||
SearchRequest searchRequest = getSearchLogTypeIndexRequest(); | ||
insertFindings(timestampFeature, searchRequest, correlatedFindings, detectorType, correlationRules, orphanFinding); | ||
Check warning on line 410 in src/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java#L409-L410
|
||
} | ||
} | ||
}, this::onFailures)); | ||
} | ||
|
@@ -430,6 +427,7 @@ | |
SearchRequest searchRequest = new SearchRequest(); | ||
searchRequest.indices(LogTypeService.LOG_TYPE_INDEX); | ||
searchRequest.source(searchSourceBuilder); | ||
searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(10L)); | ||
Check warning on line 430 in src/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java#L430
|
||
return searchRequest; | ||
} | ||
|
||
|
@@ -439,13 +437,13 @@ | |
scoreBuilder.field("root", false); | ||
scoreBuilder.endObject(); | ||
|
||
IndexRequest scoreIndexRequest = new IndexRequest(CorrelationIndices.CORRELATION_METADATA_INDEX) | ||
return new IndexRequest(CorrelationIndices.CORRELATION_METADATA_INDEX) | ||
Check warning on line 440 in src/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java#L440
|
||
.id(id) | ||
.source(scoreBuilder) | ||
.timeout(indexTimeout) | ||
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); | ||
return scoreIndexRequest; | ||
} | ||
|
||
private void insertFindings(float timestampFeature, SearchRequest searchRequest, Map<String, List<String>> correlatedFindings, String detectorType, List<String> correlationRules, Finding orphanFinding) { | ||
client.search(searchRequest, ActionListener.wrap(response -> { | ||
if (response.isTimedOut()) { | ||
|
@@ -485,6 +483,7 @@ | |
searchRequest.indices(CorrelationIndices.CORRELATION_METADATA_INDEX); | ||
searchRequest.source(searchSourceBuilder); | ||
searchRequest.preference(Preference.PRIMARY_FIRST.type()); | ||
searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(10L)); | ||
Check warning on line 486 in src/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java#L486
|
||
|
||
return searchRequest; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why such an aggressive timeout
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We were seeing correlations tasks hanging around due to searches. Since these are system indices, we have configured a standard 10 second timeout for now, but we can discuss and make it configurable in future.