Skip to content

Commit

Permalink
SQL: Introduce an async querying mode for SQL (#73991) (#74643)
Browse files Browse the repository at this point in the history
This adds an async query mode to SQL.
It (re)uses the same request and response async-specific EQL object
parameters.

Also similar to EQL, the running search task can have its state
monitored and canceled and its results stored and deleted, with
intermediary responses not supported (the entire result is available
once search finished).

The async implementation is extended to work with the SQL-specific
text formats (txt, csv, tsv) as well, besides xcontent.

Closes #71041.

(cherry picked from commit 42cc419)
  • Loading branch information
bpintea authored Jun 28, 2021
1 parent add9a9b commit e3f06a6
Show file tree
Hide file tree
Showing 61 changed files with 2,821 additions and 354 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@
* 2.0.
*/

package org.elasticsearch.xpack.eql.async;
package org.elasticsearch.xpack.core.async;

import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.async.AsyncResponse;

import java.io.IOException;
import java.util.Objects;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,14 @@
* 2.0.
*/

package org.elasticsearch.xpack.eql.async;
package org.elasticsearch.xpack.core.async;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
import org.elasticsearch.xpack.core.async.AsyncTask;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -71,7 +69,7 @@ public synchronized void removeCompletionListener(ActionListener<Response> liste
/**
* This method is called when the task is finished successfully before unregistering the task and storing the results
*/
protected synchronized void onResponse(Response response) {
public synchronized void onResponse(Response response) {
for (ActionListener<Response> listener : completionListeners) {
listener.onResponse(response);
}
Expand All @@ -80,7 +78,7 @@ protected synchronized void onResponse(Response response) {
/**
* This method is called when the task failed before unregistering the task and storing the results
*/
protected synchronized void onFailure(Exception e) {
public synchronized void onFailure(Exception e) {
for (ActionListener<Response> listener : completionListeners) {
listener.onFailure(e);
}
Expand All @@ -89,7 +87,7 @@ protected synchronized void onFailure(Exception e) {
/**
* Return currently available partial or the final results
*/
protected abstract Response getCurrentResult();
public abstract Response getCurrentResult();

@Override
public void cancelTask(TaskManager taskManager, Runnable runnable, String reason) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.sql;

/**
* Exposes SQL async action names for the RBAC engine
*/
public class SqlAsyncActionNames {
public static final String SQL_ASYNC_GET_RESULT_ACTION_NAME = "indices:data/read/sql/async/get";
public static final String SQL_ASYNC_GET_STATUS_ACTION_NAME = "cluster:monitor/xpack/sql/async/status";
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.elasticsearch.xpack.core.async.DeleteAsyncResultAction;
import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest;
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
import org.elasticsearch.xpack.eql.async.StoredAsyncResponse;
import org.elasticsearch.xpack.core.async.StoredAsyncResponse;
import org.elasticsearch.xpack.eql.plugin.EqlAsyncGetResultAction;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.xpack.ql.async.QlStatusResponse;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -41,7 +42,7 @@
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;

public class EqlSearchResponse extends ActionResponse implements ToXContentObject {
public class EqlSearchResponse extends ActionResponse implements ToXContentObject, QlStatusResponse.AsyncStatus {

private final Hits hits;
private final long tookInMillis;
Expand Down Expand Up @@ -150,14 +151,17 @@ public Hits hits() {
return hits;
}

@Override
public String id() {
return asyncExecutionId;
}

@Override
public boolean isRunning() {
return isRunning;
}

@Override
public boolean isPartial() {
return isPartial;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
import org.elasticsearch.xpack.eql.async.StoredAsyncTask;
import org.elasticsearch.xpack.core.async.StoredAsyncTask;

import java.util.Map;

Expand All @@ -27,18 +27,4 @@ public EqlSearchResponse getCurrentResult() {
return new EqlSearchResponse(EqlSearchResponse.Hits.EMPTY, System.currentTimeMillis() - getStartTime(), false,
getExecutionId().getEncoded(), true, true);
}

/**
* Returns the status from {@link EqlSearchTask}
*/
public static EqlStatusResponse getStatusResponse(EqlSearchTask asyncTask) {
return new EqlStatusResponse(
asyncTask.getExecutionId().getEncoded(),
true,
true,
asyncTask.getStartTime(),
asyncTask.getExpirationTimeMillis(),
null
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@
package org.elasticsearch.xpack.eql.plugin;

import org.elasticsearch.action.ActionType;
import org.elasticsearch.xpack.eql.action.EqlStatusResponse;
import org.elasticsearch.xpack.ql.async.QlStatusResponse;

public class EqlAsyncGetStatusAction extends ActionType<EqlStatusResponse> {
public class EqlAsyncGetStatusAction extends ActionType<QlStatusResponse> {
public static final EqlAsyncGetStatusAction INSTANCE = new EqlAsyncGetStatusAction();
public static final String NAME = "cluster:monitor/eql/async/status";

private EqlAsyncGetStatusAction() {
super(NAME, EqlStatusResponse::new);
super(NAME, QlStatusResponse::new);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public List<Setting<?>> getSettings() {
return org.elasticsearch.core.List.of(
new ActionHandler<>(EqlSearchAction.INSTANCE, TransportEqlSearchAction.class),
new ActionHandler<>(EqlStatsAction.INSTANCE, TransportEqlStatsAction.class),
new ActionHandler<>(EqlAsyncGetResultAction.INSTANCE, TransportEqlAsyncGetResultAction.class),
new ActionHandler<>(EqlAsyncGetResultAction.INSTANCE, TransportEqlAsyncGetResultsAction.class),
new ActionHandler<>(EqlAsyncGetStatusAction.INSTANCE, TransportEqlAsyncGetStatusAction.class)
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.eql.plugin;

import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.eql.EqlAsyncActionNames;
import org.elasticsearch.xpack.eql.action.EqlSearchResponse;
import org.elasticsearch.xpack.eql.action.EqlSearchTask;
import org.elasticsearch.xpack.ql.plugin.AbstractTransportQlAsyncGetResultsAction;

public class TransportEqlAsyncGetResultsAction extends AbstractTransportQlAsyncGetResultsAction<EqlSearchResponse, EqlSearchTask> {

@Inject
public TransportEqlAsyncGetResultsAction(TransportService transportService,
ActionFilters actionFilters,
ClusterService clusterService,
NamedWriteableRegistry registry,
Client client,
ThreadPool threadPool,
BigArrays bigArrays) {
super(EqlAsyncActionNames.EQL_ASYNC_GET_RESULT_ACTION_NAME, transportService, actionFilters, clusterService, registry, client,
threadPool, bigArrays, EqlSearchTask.class);
}

@Override
public Writeable.Reader<EqlSearchResponse> responseReader() {
return EqlSearchResponse::new;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,39 +6,21 @@
*/
package org.elasticsearch.xpack.eql.plugin;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
import org.elasticsearch.xpack.core.async.AsyncTaskIndexService;
import org.elasticsearch.xpack.core.async.GetAsyncStatusRequest;
import org.elasticsearch.xpack.eql.action.EqlSearchResponse;
import org.elasticsearch.xpack.eql.action.EqlSearchTask;
import org.elasticsearch.xpack.eql.action.EqlStatusResponse;
import org.elasticsearch.xpack.eql.async.StoredAsyncResponse;
import org.elasticsearch.xpack.ql.plugin.AbstractTransportQlAsyncGetStatusAction;

import java.util.Objects;

import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN;


public class TransportEqlAsyncGetStatusAction extends HandledTransportAction<GetAsyncStatusRequest, EqlStatusResponse> {
private final TransportService transportService;
private final ClusterService clusterService;
private final AsyncTaskIndexService<StoredAsyncResponse<EqlSearchResponse>> store;

public class TransportEqlAsyncGetStatusAction extends AbstractTransportQlAsyncGetStatusAction<EqlSearchResponse, EqlSearchTask> {
@Inject
public TransportEqlAsyncGetStatusAction(TransportService transportService,
ActionFilters actionFilters,
Expand All @@ -47,31 +29,12 @@ public TransportEqlAsyncGetStatusAction(TransportService transportService,
Client client,
ThreadPool threadPool,
BigArrays bigArrays) {
super(EqlAsyncGetStatusAction.NAME, transportService, actionFilters, GetAsyncStatusRequest::new);
this.transportService = transportService;
this.clusterService = clusterService;
Writeable.Reader<StoredAsyncResponse<EqlSearchResponse>> reader = in -> new StoredAsyncResponse<>(EqlSearchResponse::new, in);
this.store = new AsyncTaskIndexService<>(XPackPlugin.ASYNC_RESULTS_INDEX, clusterService,
threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN, reader, registry, bigArrays);
super(EqlAsyncGetStatusAction.NAME, transportService, actionFilters, clusterService, registry, client, threadPool, bigArrays,
EqlSearchTask.class);
}

@Override
protected void doExecute(Task task, GetAsyncStatusRequest request, ActionListener<EqlStatusResponse> listener) {
AsyncExecutionId searchId = AsyncExecutionId.decode(request.getId());
DiscoveryNode node = clusterService.state().nodes().get(searchId.getTaskId().getNodeId());
DiscoveryNode localNode = clusterService.state().getNodes().getLocalNode();
if (node == null || Objects.equals(node, localNode)) {
store.retrieveStatus(
request,
taskManager,
EqlSearchTask.class,
EqlSearchTask::getStatusResponse,
EqlStatusResponse::getStatusFromStoredSearch,
listener
);
} else {
transportService.sendRequest(node, EqlAsyncGetStatusAction.NAME, request,
new ActionListenerResponseHandler<>(listener, EqlStatusResponse::new, ThreadPool.Names.SAME));
}
protected Writeable.Reader<EqlSearchResponse> responseReader() {
return EqlSearchResponse::new;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@
import org.elasticsearch.xpack.eql.action.EqlSearchRequest;
import org.elasticsearch.xpack.eql.action.EqlSearchResponse;
import org.elasticsearch.xpack.eql.action.EqlSearchTask;
import org.elasticsearch.xpack.eql.async.AsyncTaskManagementService;
import org.elasticsearch.xpack.eql.execution.PlanExecutor;
import org.elasticsearch.xpack.eql.parser.ParserParams;
import org.elasticsearch.xpack.eql.session.EqlConfiguration;
import org.elasticsearch.xpack.eql.session.Results;
import org.elasticsearch.xpack.ql.async.AsyncTaskManagementService;
import org.elasticsearch.xpack.ql.expression.Order;

import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* 2.0.
*/

package org.elasticsearch.xpack.eql.async;
package org.elasticsearch.xpack.ql.async;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -34,6 +34,8 @@
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
import org.elasticsearch.xpack.core.async.AsyncTask;
import org.elasticsearch.xpack.core.async.AsyncTaskIndexService;
import org.elasticsearch.xpack.core.async.StoredAsyncResponse;
import org.elasticsearch.xpack.core.async.StoredAsyncTask;

import java.io.IOException;
import java.util.Map;
Expand Down
Loading

0 comments on commit e3f06a6

Please sign in to comment.