-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
EQL: add support for partial shard results #116388
Changes from 30 commits
4aa4a97
95f9a1b
d14c1f6
b72fd37
a86ab6a
18cfa60
8777fc8
4e63b3c
b6501cc
f052782
3e5439e
90fc499
35eb31e
b003c1c
8978a01
4c421c0
2ab3972
3ab9740
8207ea0
f3a1a65
c54a0c5
a8f5fb5
fcfa021
706935c
545e614
32a7aef
1e97b85
1da924c
3ebacb8
ed6b9a7
9f9eba8
7efff36
045d8da
672e512
e1e83a6
625acf4
7f36a69
984fe02
a1c903f
f58fd1c
3aba03a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
pr: 116388 | ||
summary: Add support for partial shard results | ||
area: EQL | ||
type: enhancement | ||
issues: [] |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2453,7 +2453,7 @@ public static void afterClass() throws Exception { | |
/** | ||
* After the cluster is stopped, there are a few netty threads that can linger, so we make sure we don't leak any tasks on them. | ||
*/ | ||
static void awaitGlobalNettyThreadsFinish() throws Exception { | ||
public static void awaitGlobalNettyThreadsFinish() throws Exception { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I didn't find an obvious reason for this change. Can you shed some light, please? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's a leftover, I don't need it anymore. Reverting |
||
// Don't use GlobalEventExecutor#awaitInactivity. It will waste up to 1s for every call and we expect no tasks queued for it | ||
// except for the odd scheduled shutdown task. | ||
assertBusy(() -> assertEquals(0, GlobalEventExecutor.INSTANCE.pendingTasks())); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,6 +33,9 @@ | |
import java.util.function.Function; | ||
import java.util.stream.Collectors; | ||
|
||
import static org.hamcrest.Matchers.greaterThan; | ||
import static org.hamcrest.Matchers.is; | ||
|
||
public abstract class BaseEqlSpecTestCase extends RemoteClusterAwareEqlRestTestCase { | ||
|
||
protected static final String PARAM_FORMATTING = "%2$s"; | ||
|
@@ -52,6 +55,9 @@ public abstract class BaseEqlSpecTestCase extends RemoteClusterAwareEqlRestTestC | |
*/ | ||
private final int size; | ||
private final int maxSamplesPerKey; | ||
private final Boolean allowPartialSearchResults; | ||
private final Boolean allowPartialSequenceResults; | ||
private final Boolean expectShardFailures; | ||
|
||
@Before | ||
public void setup() throws Exception { | ||
|
@@ -104,7 +110,16 @@ protected static List<Object[]> asArray(List<EqlSpec> specs) { | |
} | ||
|
||
results.add( | ||
new Object[] { spec.query(), name, spec.expectedEventIds(), spec.joinKeys(), spec.size(), spec.maxSamplesPerKey() } | ||
new Object[] { | ||
spec.query(), | ||
name, | ||
spec.expectedEventIds(), | ||
spec.joinKeys(), | ||
spec.size(), | ||
spec.maxSamplesPerKey(), | ||
spec.allowPartialSearchResults(), | ||
spec.allowPartialSequenceResults(), | ||
spec.expectShardFailures() } | ||
); | ||
} | ||
|
||
|
@@ -118,7 +133,10 @@ protected static List<Object[]> asArray(List<EqlSpec> specs) { | |
List<long[]> eventIds, | ||
String[] joinKeys, | ||
Integer size, | ||
Integer maxSamplesPerKey | ||
Integer maxSamplesPerKey, | ||
Boolean allowPartialSearchResults, | ||
Boolean allowPartialSequenceResults, | ||
Boolean expectShardFailures | ||
) { | ||
this.index = index; | ||
|
||
|
@@ -128,6 +146,9 @@ protected static List<Object[]> asArray(List<EqlSpec> specs) { | |
this.joinKeys = joinKeys; | ||
this.size = size == null ? -1 : size; | ||
this.maxSamplesPerKey = maxSamplesPerKey == null ? -1 : maxSamplesPerKey; | ||
this.allowPartialSearchResults = allowPartialSearchResults; | ||
this.allowPartialSequenceResults = allowPartialSequenceResults; | ||
this.expectShardFailures = expectShardFailures; | ||
} | ||
|
||
public void test() throws Exception { | ||
|
@@ -137,6 +158,7 @@ public void test() throws Exception { | |
private void assertResponse(ObjectPath response) throws Exception { | ||
List<Map<String, Object>> events = response.evaluate("hits.events"); | ||
List<Map<String, Object>> sequences = response.evaluate("hits.sequences"); | ||
Object shardFailures = response.evaluate("shard_failures"); | ||
|
||
if (events != null) { | ||
assertEvents(events); | ||
|
@@ -145,6 +167,7 @@ private void assertResponse(ObjectPath response) throws Exception { | |
} else { | ||
fail("No events or sequences found"); | ||
} | ||
assertShardFailures(shardFailures); | ||
} | ||
|
||
protected ObjectPath runQuery(String index, String query) throws Exception { | ||
|
@@ -163,13 +186,34 @@ protected ObjectPath runQuery(String index, String query) throws Exception { | |
if (maxSamplesPerKey > 0) { | ||
builder.field("max_samples_per_key", maxSamplesPerKey); | ||
} | ||
boolean allowPartialResultsInBody = randomBoolean(); | ||
if (allowPartialSearchResults != null) { | ||
if (allowPartialResultsInBody) { | ||
builder.field("allow_partial_search_results", String.valueOf(allowPartialSearchResults)); | ||
if (allowPartialSequenceResults != null) { | ||
builder.field("allow_partial_sequence_results", String.valueOf(allowPartialSequenceResults)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happens if an user provides only There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
} | ||
} | ||
} else if (randomBoolean()) { | ||
builder.field("allow_partial_search_results", randomBoolean()); | ||
} | ||
builder.endObject(); | ||
|
||
Request request = new Request("POST", "/" + index + "/_eql/search"); | ||
Boolean ccsMinimizeRoundtrips = ccsMinimizeRoundtrips(); | ||
if (ccsMinimizeRoundtrips != null) { | ||
request.addParameter("ccs_minimize_roundtrips", ccsMinimizeRoundtrips.toString()); | ||
} | ||
if (allowPartialSearchResults != null) { | ||
if (allowPartialResultsInBody == false) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
In other words, randomize any combination of request parameters presence and values in the two places where we allow them. |
||
request.addParameter("allow_partial_search_results", String.valueOf(allowPartialSearchResults)); | ||
if (allowPartialSequenceResults != null) { | ||
request.addParameter("allow_partial_sequence_results", String.valueOf(allowPartialSequenceResults)); | ||
} | ||
} | ||
} else if (randomBoolean()) { | ||
request.addParameter("allow_partial_search_results", String.valueOf(randomBoolean())); | ||
} | ||
int timeout = Math.toIntExact(timeout().millis()); | ||
RequestConfig config = RequestConfig.copy(RequestConfig.DEFAULT) | ||
.setConnectionRequestTimeout(timeout) | ||
|
@@ -182,6 +226,18 @@ protected ObjectPath runQuery(String index, String query) throws Exception { | |
return ObjectPath.createFromResponse(client().performRequest(request)); | ||
} | ||
|
||
private void assertShardFailures(Object shardFailures) { | ||
if (expectShardFailures != null) { | ||
if (expectShardFailures) { | ||
assertNotNull(shardFailures); | ||
List<?> list = (List<?>) shardFailures; | ||
assertThat(list.size(), is(greaterThan(0))); | ||
} else { | ||
assertNull(shardFailures); | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If |
||
} | ||
|
||
private void assertEvents(List<Map<String, Object>> events) { | ||
assertNotNull(events); | ||
logger.debug("Events {}", new Object() { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,9 +27,23 @@ public EqlMissingEventsSpecTestCase( | |
List<long[]> eventIds, | ||
String[] joinKeys, | ||
Integer size, | ||
Integer maxSamplesPerKey | ||
Integer maxSamplesPerKey, | ||
Boolean allowPartialSearchResults, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be great if all the existent tests in EQL that have no idea about partial results (all the tests based on BaseEqlSpecTestCase before this PR) would completely randomize these two new settings. All of them should behave completely the same because we have no "broken" shard setup for them which means that irrespective of the values provided, the tests should pass. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is some randomization already, but I'm making it more complete. |
||
Boolean allowPartialSequenceResults, | ||
Boolean expectShardFailures | ||
) { | ||
this(TEST_MISSING_EVENTS_INDEX, query, name, eventIds, joinKeys, size, maxSamplesPerKey); | ||
this( | ||
TEST_MISSING_EVENTS_INDEX, | ||
query, | ||
name, | ||
eventIds, | ||
joinKeys, | ||
size, | ||
maxSamplesPerKey, | ||
allowPartialSearchResults, | ||
allowPartialSequenceResults, | ||
expectShardFailures | ||
); | ||
} | ||
|
||
// constructor for multi-cluster tests | ||
|
@@ -40,9 +54,23 @@ public EqlMissingEventsSpecTestCase( | |
List<long[]> eventIds, | ||
String[] joinKeys, | ||
Integer size, | ||
Integer maxSamplesPerKey | ||
Integer maxSamplesPerKey, | ||
Boolean allowPartialSearchResults, | ||
Boolean allowPartialSequenceResults, | ||
Boolean expectShardFailures | ||
) { | ||
super(index, query, name, eventIds, joinKeys, size, maxSamplesPerKey); | ||
super( | ||
index, | ||
query, | ||
name, | ||
eventIds, | ||
joinKeys, | ||
size, | ||
maxSamplesPerKey, | ||
allowPartialSearchResults, | ||
allowPartialSequenceResults, | ||
expectShardFailures | ||
); | ||
} | ||
|
||
@Override | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ES search API supports partial results for async search as well, if I read the documentation correctly. Is there something stopping us for doing the same with EQL?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think when _search docs mention partial results at the beginning of the page, they refer to something slightly different, ie. the first part of a response that is still being calculated.
Search results can also be partial because of missing shards, that is the same we have in EQL, regardless of the request being sync or async.
This said, we definitely need some tests for async EQL queries with
allow_partial_search_results=true
. I'm adding them.