Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add search request timeouts for correlations workflows #893

Merged
merged 4 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.lucene.search.join.ScoreMode;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.commons.alerting.model.DocLevelQuery;
import org.opensearch.core.action.ActionListener;
import org.opensearch.action.search.MultiSearchRequest;
Expand Down Expand Up @@ -132,6 +133,7 @@
searchRequest.indices(DetectorMonitorConfig.getAllFindingsIndicesPattern(logTypeName));
searchRequest.source(sourceBuilder);
searchRequest.preference(Preference.PRIMARY_FIRST.type());
searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));

Check warning on line 136 in src/main/java/org/opensearch/securityanalytics/correlation/JoinEngine.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/correlation/JoinEngine.java#L136

Added line #L136 was not covered by tests
mSearchRequest.add(searchRequest);
}

Expand Down Expand Up @@ -214,6 +216,7 @@
searchRequest.indices(CorrelationRule.CORRELATION_RULE_INDEX);
searchRequest.source(searchSourceBuilder);
searchRequest.preference(Preference.PRIMARY_FIRST.type());
searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));

Check warning on line 219 in src/main/java/org/opensearch/securityanalytics/correlation/JoinEngine.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/correlation/JoinEngine.java#L219

Added line #L219 was not covered by tests

client.search(searchRequest, ActionListener.wrap(response -> {
if (response.isTimedOut()) {
Expand Down Expand Up @@ -277,6 +280,7 @@
searchRequest.indices(indices.toArray(new String[]{}));
searchRequest.source(searchSourceBuilder);
searchRequest.preference(Preference.PRIMARY_FIRST.type());
searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));

Check warning on line 283 in src/main/java/org/opensearch/securityanalytics/correlation/JoinEngine.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/correlation/JoinEngine.java#L283

Added line #L283 was not covered by tests

validCorrelationRules.add(rule);
validFields.add(query.get().getField());
Expand Down Expand Up @@ -377,6 +381,7 @@
searchRequest.indices(DetectorMonitorConfig.getAllFindingsIndicesPattern(categoryToQueries.getKey()));
searchRequest.source(searchSourceBuilder);
searchRequest.preference(Preference.PRIMARY_FIRST.type());
searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));

Check warning on line 384 in src/main/java/org/opensearch/securityanalytics/correlation/JoinEngine.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/correlation/JoinEngine.java#L384

Added line #L384 was not covered by tests
mSearchRequest.add(searchRequest);
categoryToQueriesPairs.add(Pair.of(categoryToQueries.getKey(), categoryToQueries.getValue()));
}
Expand Down Expand Up @@ -441,6 +446,7 @@
searchRequest.indices(docSearchCriteria.getValue().indices.toArray(new String[]{}));
searchRequest.source(searchSourceBuilder);
searchRequest.preference(Preference.PRIMARY_FIRST.type());
searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));

Check warning on line 449 in src/main/java/org/opensearch/securityanalytics/correlation/JoinEngine.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/correlation/JoinEngine.java#L449

Added line #L449 was not covered by tests

categories.add(docSearchCriteria.getKey());
mSearchRequest.add(searchRequest);
Expand Down Expand Up @@ -502,6 +508,7 @@
searchRequest.indices(DetectorMonitorConfig.getAllFindingsIndicesPattern(relatedDocIds.getKey()));
searchRequest.source(searchSourceBuilder);
searchRequest.preference(Preference.PRIMARY_FIRST.type());
searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));

Check warning on line 511 in src/main/java/org/opensearch/securityanalytics/correlation/JoinEngine.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/correlation/JoinEngine.java#L511

Added line #L511 was not covered by tests

categories.add(relatedDocIds.getKey());
mSearchRequest.add(searchRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.opensearch.securityanalytics.transport.TransportCorrelateFindingAction;
import org.opensearch.securityanalytics.util.CorrelationIndices;

import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -94,6 +95,7 @@
request.indices(CorrelationIndices.CORRELATION_HISTORY_INDEX_PATTERN_REGEXP);
request.source(searchSourceBuilder);
request.preference(Preference.PRIMARY_FIRST.type());
request.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));

Check warning on line 98 in src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java#L98

Added line #L98 was not covered by tests

mSearchRequest.add(request);
}
Expand Down Expand Up @@ -195,6 +197,12 @@
}

public void insertOrphanFindings(String detectorType, Finding finding, float timestampFeature, Map<String, CustomLogType> logTypes) {
if (logTypes.get(detectorType) == null ) {
log.debug("Missing detector type {} in the log types index for finding id {}. Keys in the index: {}",
detectorType, finding.getId(), Arrays.toString(logTypes.keySet().toArray()));
onFailure(new OpenSearchStatusException("insertOrphanFindings null log types for detector type: " + detectorType, RestStatus.INTERNAL_SERVER_ERROR));

Check warning on line 203 in src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java#L201-L203

Added lines #L201 - L203 were not covered by tests
}

SearchRequest searchRequest = getSearchMetadataIndexRequest(detectorType, finding, logTypes);
Map<String, Object> tags = logTypes.get(detectorType).getTags();
String correlationId = tags.get("correlation_id").toString();
Expand Down Expand Up @@ -251,7 +259,8 @@
onFailure(ex);
}
} else {
onFailure(new OpenSearchStatusException(indexResponse.toString(), RestStatus.INTERNAL_SERVER_ERROR));
onFailure(new OpenSearchStatusException("Indexing failed with response {} ",
indexResponse.status(), indexResponse.toString()));

Check warning on line 263 in src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java#L262-L263

Added lines #L262 - L263 were not covered by tests
}
}, this::onFailure));
} else {
Expand Down Expand Up @@ -297,7 +306,8 @@
onFailure(ex);
}
} else {
onFailure(new OpenSearchStatusException(indexResponse.toString(), RestStatus.INTERNAL_SERVER_ERROR));
onFailure(new OpenSearchStatusException("Indexing failed with response {} ",
indexResponse.status(), indexResponse.toString()));

Check warning on line 310 in src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java#L309-L310

Added lines #L309 - L310 were not covered by tests
}
}, this::onFailure));
} else {
Expand All @@ -323,6 +333,7 @@
request.indices(CorrelationIndices.CORRELATION_HISTORY_INDEX_PATTERN_REGEXP);
request.source(searchSourceBuilder);
request.preference(Preference.PRIMARY_FIRST.type());
request.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));

Check warning on line 336 in src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java#L336

Added line #L336 was not covered by tests

client.search(request, ActionListener.wrap(searchResponse -> {
if (searchResponse.isTimedOut()) {
Expand Down Expand Up @@ -407,6 +418,9 @@
} catch (Exception ex) {
onFailure(ex);
}
} else {
onFailure(new OpenSearchStatusException("Indexing failed with response {} ",
indexResponse.status(), indexResponse.toString()));

Check warning on line 423 in src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java#L422-L423

Added lines #L422 - L423 were not covered by tests
}
}, this::onFailure));
} catch (Exception ex) {
Expand All @@ -432,7 +446,7 @@
if (response.status().equals(RestStatus.CREATED)) {
correlateFindingAction.onOperation();
} else {
onFailure(new OpenSearchStatusException(response.toString(), RestStatus.INTERNAL_SERVER_ERROR));
onFailure(new OpenSearchStatusException("Indexing failed with response {} ", response.status(), response.toString()));

Check warning on line 449 in src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java#L449

Added line #L449 was not covered by tests
}
}, this::onFailure));
}
Expand All @@ -454,6 +468,7 @@
searchRequest.indices(CorrelationIndices.CORRELATION_METADATA_INDEX);
searchRequest.source(searchSourceBuilder);
searchRequest.preference(Preference.PRIMARY_FIRST.type());
searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));

Check warning on line 471 in src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java#L471

Added line #L471 was not covered by tests
return searchRequest;
}

Expand Down
Loading
Loading