Skip to content

Commit

Permalink
EQL: Adds an ability to start an asynchronous EQL search (#56631)
Browse files Browse the repository at this point in the history
Adds support for async searches to eql search API. This commit is limited to
only submitting search API requests and doesn't provide APIs to get results
nor delete the results. These functions will be added in follow up PRs.

Relates to #49638
  • Loading branch information
imotov authored May 13, 2020
1 parent 5aa36ef commit fcebd4f
Show file tree
Hide file tree
Showing 26 changed files with 1,550 additions and 286 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Validatable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand All @@ -44,13 +45,21 @@ public class EqlSearchRequest implements Validatable, ToXContentObject {
private SearchAfterBuilder searchAfterBuilder;
private String query;

// Async settings
private TimeValue waitForCompletionTimeout;
private boolean keepOnCompletion;
private TimeValue keepAlive;

static final String KEY_FILTER = "filter";
static final String KEY_TIMESTAMP_FIELD = "timestamp_field";
static final String KEY_EVENT_CATEGORY_FIELD = "event_category_field";
static final String KEY_IMPLICIT_JOIN_KEY_FIELD = "implicit_join_key_field";
static final String KEY_SIZE = "size";
static final String KEY_SEARCH_AFTER = "search_after";
static final String KEY_QUERY = "query";
static final String KEY_WAIT_FOR_COMPLETION_TIMEOUT = "wait_for_completion_timeout";
static final String KEY_KEEP_ALIVE = "keep_alive";
static final String KEY_KEEP_ON_COMPLETION = "keep_on_completion";

public EqlSearchRequest(String indices, String query) {
indices(indices);
Expand All @@ -75,6 +84,13 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par
}

builder.field(KEY_QUERY, query);
if (waitForCompletionTimeout != null) {
builder.field(KEY_WAIT_FOR_COMPLETION_TIMEOUT, waitForCompletionTimeout);
}
if (keepAlive != null) {
builder.field(KEY_KEEP_ALIVE, keepAlive);
}
builder.field(KEY_KEEP_ON_COMPLETION, keepOnCompletion);
builder.endObject();
return builder;
}
Expand Down Expand Up @@ -166,6 +182,32 @@ public EqlSearchRequest query(String query) {
return this;
}

public TimeValue waitForCompletionTimeout() {
return waitForCompletionTimeout;
}

public EqlSearchRequest waitForCompletionTimeout(TimeValue waitForCompletionTimeout) {
this.waitForCompletionTimeout = waitForCompletionTimeout;
return this;
}

public Boolean keepOnCompletion() {
return keepOnCompletion;
}

public void keepOnCompletion(Boolean keepOnCompletion) {
this.keepOnCompletion = keepOnCompletion;
}

public TimeValue keepAlive() {
return keepAlive;
}

public EqlSearchRequest keepAlive(TimeValue keepAlive) {
this.keepAlive = keepAlive;
return this;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -183,7 +225,10 @@ public boolean equals(Object o) {
Objects.equals(eventCategoryField, that.eventCategoryField) &&
Objects.equals(implicitJoinKeyField, that.implicitJoinKeyField) &&
Objects.equals(searchAfterBuilder, that.searchAfterBuilder) &&
Objects.equals(query, that.query);
Objects.equals(query, that.query) &&
Objects.equals(waitForCompletionTimeout, that.waitForCompletionTimeout) &&
Objects.equals(keepAlive, that.keepAlive) &&
Objects.equals(keepOnCompletion, that.keepOnCompletion);
}

@Override
Expand All @@ -197,7 +242,10 @@ public int hashCode() {
eventCategoryField,
implicitJoinKeyField,
searchAfterBuilder,
query);
query,
waitForCompletionTimeout,
keepAlive,
keepOnCompletion);
}

public String[] indices() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.InstantiatingObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
Expand All @@ -32,43 +33,56 @@
import java.util.List;
import java.util.Objects;

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;

public class EqlSearchResponse {

private final Hits hits;
private final long tookInMillis;
private final boolean isTimeout;
private final String asyncExecutionId;
private final boolean isRunning;
private final boolean isPartial;

private static final class Fields {
static final String TOOK = "took";
static final String TIMED_OUT = "timed_out";
static final String HITS = "hits";
static final String ID = "id";
static final String IS_RUNNING = "is_running";
static final String IS_PARTIAL = "is_partial";
}

private static final ParseField TOOK = new ParseField(Fields.TOOK);
private static final ParseField TIMED_OUT = new ParseField(Fields.TIMED_OUT);
private static final ParseField HITS = new ParseField(Fields.HITS);
private static final ParseField ID = new ParseField(Fields.ID);
private static final ParseField IS_RUNNING = new ParseField(Fields.IS_RUNNING);
private static final ParseField IS_PARTIAL = new ParseField(Fields.IS_PARTIAL);

private static final ConstructingObjectParser<EqlSearchResponse, Void> PARSER =
new ConstructingObjectParser<>("eql/search_response", true,
args -> {
int i = 0;
Hits hits = (Hits) args[i++];
Long took = (Long) args[i++];
Boolean timeout = (Boolean) args[i];
return new EqlSearchResponse(hits, took, timeout);
});

private static final InstantiatingObjectParser<EqlSearchResponse, Void> PARSER;
static {
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> Hits.fromXContent(p), HITS);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOOK);
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), TIMED_OUT);
InstantiatingObjectParser.Builder<EqlSearchResponse, Void> parser =
InstantiatingObjectParser.builder("eql/search_response", true, EqlSearchResponse.class);
parser.declareObject(constructorArg(), (p, c) -> Hits.fromXContent(p), HITS);
parser.declareLong(constructorArg(), TOOK);
parser.declareBoolean(constructorArg(), TIMED_OUT);
parser.declareString(optionalConstructorArg(), ID);
parser.declareBoolean(constructorArg(), IS_RUNNING);
parser.declareBoolean(constructorArg(), IS_PARTIAL);
PARSER = parser.build();
}

public EqlSearchResponse(Hits hits, long tookInMillis, boolean isTimeout) {
public EqlSearchResponse(Hits hits, long tookInMillis, boolean isTimeout, String asyncExecutionId,
boolean isRunning, boolean isPartial) {
super();
this.hits = hits == null ? Hits.EMPTY : hits;
this.tookInMillis = tookInMillis;
this.isTimeout = isTimeout;
this.asyncExecutionId = asyncExecutionId;
this.isRunning = isRunning;
this.isPartial = isPartial;
}

public static EqlSearchResponse fromXContent(XContentParser parser) {
Expand All @@ -87,6 +101,18 @@ public Hits hits() {
return hits;
}

public String id() {
return asyncExecutionId;
}

public boolean isRunning() {
return isRunning;
}

public boolean isPartial() {
return isPartial;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,12 @@ public static org.elasticsearch.xpack.eql.action.EqlSearchResponse createRandomE
if (randomBoolean()) {
hits = new org.elasticsearch.xpack.eql.action.EqlSearchResponse.Hits(randomEvents(), null, null, totalHits);
}
return new org.elasticsearch.xpack.eql.action.EqlSearchResponse(hits, randomIntBetween(0, 1001), randomBoolean());
if (randomBoolean()) {
return new org.elasticsearch.xpack.eql.action.EqlSearchResponse(hits, randomIntBetween(0, 1001), randomBoolean());
} else {
return new org.elasticsearch.xpack.eql.action.EqlSearchResponse(hits, randomIntBetween(0, 1001), randomBoolean(),
randomAlphaOfLength(10), randomBoolean(), randomBoolean());
}
}

public static org.elasticsearch.xpack.eql.action.EqlSearchResponse createRandomSequencesResponse(TotalHits totalHits) {
Expand All @@ -77,7 +82,12 @@ public static org.elasticsearch.xpack.eql.action.EqlSearchResponse createRandomS
if (randomBoolean()) {
hits = new org.elasticsearch.xpack.eql.action.EqlSearchResponse.Hits(null, seq, null, totalHits);
}
return new org.elasticsearch.xpack.eql.action.EqlSearchResponse(hits, randomIntBetween(0, 1001), randomBoolean());
if (randomBoolean()) {
return new org.elasticsearch.xpack.eql.action.EqlSearchResponse(hits, randomIntBetween(0, 1001), randomBoolean());
} else {
return new org.elasticsearch.xpack.eql.action.EqlSearchResponse(hits, randomIntBetween(0, 1001), randomBoolean(),
randomAlphaOfLength(10), randomBoolean(), randomBoolean());
}
}

public static org.elasticsearch.xpack.eql.action.EqlSearchResponse createRandomCountResponse(TotalHits totalHits) {
Expand All @@ -97,7 +107,12 @@ public static org.elasticsearch.xpack.eql.action.EqlSearchResponse createRandomC
if (randomBoolean()) {
hits = new org.elasticsearch.xpack.eql.action.EqlSearchResponse.Hits(null, null, cn, totalHits);
}
return new org.elasticsearch.xpack.eql.action.EqlSearchResponse(hits, randomIntBetween(0, 1001), randomBoolean());
if (randomBoolean()) {
return new org.elasticsearch.xpack.eql.action.EqlSearchResponse(hits, randomIntBetween(0, 1001), randomBoolean());
} else {
return new org.elasticsearch.xpack.eql.action.EqlSearchResponse(hits, randomIntBetween(0, 1001), randomBoolean(),
randomAlphaOfLength(10), randomBoolean(), randomBoolean());
}
}

public static org.elasticsearch.xpack.eql.action.EqlSearchResponse createRandomInstance(TotalHits totalHits) {
Expand Down
6 changes: 4 additions & 2 deletions docs/reference/eql/eql-search-api.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,9 @@ the https://en.wikipedia.org/wiki/Unix_time[Unix epoch], in ascending order.
]
}
]
}
},
"is_partial": false,
"is_running": false
}
----
// TESTRESPONSE[s/"took": 6/"took": $body.took/]
// TESTRESPONSE[s/"took": 6/"took": $body.took/]
4 changes: 3 additions & 1 deletion docs/reference/eql/search.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ https://en.wikipedia.org/wiki/Unix_time[Unix epoch], in ascending order.
]
}
]
}
},
"is_partial": false,
"is_running": false
}
----
// TESTRESPONSE[s/"took": 60/"took": $body.took/]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void onResponse(AsyncSearchResponse searchResponse) {
// creates the fallback response if the node crashes/restarts in the middle of the request
// TODO: store intermediate results ?
AsyncSearchResponse initialResp = searchResponse.clone(searchResponse.getId());
store.storeInitialResponse(docId, searchTask.getOriginHeaders(), initialResp,
store.createResponse(docId, searchTask.getOriginHeaders(), initialResp,
new ActionListener<>() {
@Override
public void onResponse(IndexResponse r) {
Expand Down Expand Up @@ -191,7 +191,7 @@ private void onFinalResponse(CancellableTask submitTask,
}

try {
store.storeFinalResponse(searchTask.getExecutionId().getDocId(), threadContext.getResponseHeaders(),response,
store.updateResponse(searchTask.getExecutionId().getDocId(), threadContext.getResponseHeaders(),response,
ActionListener.wrap(resp -> unregisterTaskAndMoveOn(searchTask, nextAction),
exc -> {
Throwable cause = ExceptionsHelper.unwrapCause(exc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,10 @@ void createIndexIfNecessary(ActionListener<Void> listener) {
* Stores the initial response with the original headers of the authenticated user
* and the expected expiration time.
*/
public void storeInitialResponse(String docId,
Map<String, String> headers,
R response,
ActionListener<IndexResponse> listener) throws IOException {
public void createResponse(String docId,
Map<String, String> headers,
R response,
ActionListener<IndexResponse> listener) throws IOException {
Map<String, Object> source = new HashMap<>();
source.put(HEADERS_FIELD, headers);
source.put(EXPIRATION_TIME_FIELD, response.getExpirationTime());
Expand All @@ -181,10 +181,10 @@ public void storeInitialResponse(String docId,
/**
* Stores the final response if the place-holder document is still present (update).
*/
public void storeFinalResponse(String docId,
Map<String, List<String>> responseHeaders,
R response,
ActionListener<UpdateResponse> listener) throws IOException {
public void updateResponse(String docId,
Map<String, List<String>> responseHeaders,
R response,
ActionListener<UpdateResponse> listener) throws IOException {
Map<String, Object> source = new HashMap<>();
source.put(RESPONSE_HEADERS_FIELD, responseHeaders);
source.put(RESULT_FIELD, encodeResponse(response));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,16 @@ setup:
- match: {hits.total.relation: "eq"}
- match: {hits.events.0._source.user: "SYSTEM"}

---
"Execute some EQL in async mode":
- do:
eql.search:
index: eql_test
wait_for_completion_timeout: "0ms"
body:
query: "process where user = 'SYSTEM'"

- match: {is_running: true}
- match: {is_partial: true}
- is_true: id

Loading

0 comments on commit fcebd4f

Please sign in to comment.