Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sessionId parameters for create async query API #2312

Merged
merged 17 commits into from
Oct 18, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.sql.common.setting;

import static org.opensearch.sql.common.setting.Settings.Key.SPARK_EXECUTION_SESSION_ENABLED;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import java.util.List;
Expand Down Expand Up @@ -36,7 +38,8 @@ public enum Key {
METRICS_ROLLING_WINDOW("plugins.query.metrics.rolling_window"),
METRICS_ROLLING_INTERVAL("plugins.query.metrics.rolling_interval"),
SPARK_EXECUTION_ENGINE_CONFIG("plugins.query.executionengine.spark.config"),
CLUSTER_NAME("cluster.name");
CLUSTER_NAME("cluster.name"),
SPARK_EXECUTION_SESSION_ENABLED("plugins.query.executionengine.spark.session.enabled");

@Getter private final String keyValue;

Expand All @@ -60,4 +63,9 @@ public static Optional<Key> of(String keyValue) {
public abstract <T> T getSettingValue(Key key);

public abstract List<?> getSettings();

/** Helper class */
public static boolean isSparkExecutionSessionEnabled(Settings settings) {
return settings.getSettingValue(SPARK_EXECUTION_SESSION_ENABLED);
}
}
36 changes: 36 additions & 0 deletions docs/user/admin/settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -311,3 +311,39 @@ SQL query::
"status": 400
}

plugins.query.executionengine.spark.session.enabled
===================================================

Description
-----------

By default, execution engine is executed in job mode. You can enable session mode by this setting.

1. The default value is false.
2. This setting is node scope.
3. This setting can be updated dynamically.

You can update the setting with a new value like this.

SQL query::

sh$ curl -sS -H 'Content-Type: application/json' -X PUT localhost:9200/_plugins/_query/settings \
... -d '{"transient":{"plugins.query.executionengine.spark.session.enabled":"false"}}'
dai-chen marked this conversation as resolved.
Show resolved Hide resolved
{
"acknowledged": true,
"persistent": {},
"transient": {
"plugins": {
"query": {
"executionengine": {
"spark": {
"session": {
"enabled": "false"
}
}
}
}
}
}
}

44 changes: 44 additions & 0 deletions docs/user/interfaces/asyncqueryinterface.rst
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,50 @@ Sample Response::
"queryId": "00fd796ut1a7eg0q"
}

Execute query in session
------------------------

if plugins.query.executionengine.spark.session.enabled is set to true, session based execution is enabled. Under the hood, all queries submitted to the same session will be executed in the same SparkContext. Session is auto closed if not query submission in 10 minutes.

Async query response include ``sessionId`` indicate the query is executed in session.

Sample Request::

curl --location 'http://localhost:9200/_plugins/_async_query' \
--header 'Content-Type: application/json' \
--data '{
"datasource" : "my_glue",
"lang" : "sql",
"query" : "select * from my_glue.default.http_logs limit 10"
}'

Sample Response::

{
"queryId": "HlbM61kX6MDkAktO",
"sessionId": "1Giy65ZnzNlmsPAm"
}

User could reuse the session by using ``sessionId`` query parameters.

Sample Request::

curl --location 'http://localhost:9200/_plugins/_async_query' \
--header 'Content-Type: application/json' \
--data '{
"datasource" : "my_glue",
"lang" : "sql",
"query" : "select * from my_glue.default.http_logs limit 10",
"sessionId" : "1Giy65ZnzNlmsPAm"
}'

Sample Response::

{
"queryId": "7GC4mHhftiTejvxN",
"sessionId": "1Giy65ZnzNlmsPAm"
}


Async Query Result API
======================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,13 @@ public class OpenSearchSettings extends Settings {
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<?> SPARK_EXECUTION_SESSION_ENABLED_SETTING =
Setting.boolSetting(
Key.SPARK_EXECUTION_SESSION_ENABLED.getKeyValue(),
vmmusings marked this conversation as resolved.
Show resolved Hide resolved
false,
Setting.Property.NodeScope,
Setting.Property.Dynamic);

/** Construct OpenSearchSetting. The OpenSearchSetting must be singleton. */
@SuppressWarnings("unchecked")
public OpenSearchSettings(ClusterSettings clusterSettings) {
Expand Down Expand Up @@ -205,6 +212,12 @@ public OpenSearchSettings(ClusterSettings clusterSettings) {
Key.SPARK_EXECUTION_ENGINE_CONFIG,
SPARK_EXECUTION_ENGINE_CONFIG,
new Updater(Key.SPARK_EXECUTION_ENGINE_CONFIG));
register(
settingBuilder,
clusterSettings,
Key.SPARK_EXECUTION_SESSION_ENABLED,
SPARK_EXECUTION_SESSION_ENABLED_SETTING,
new Updater(Key.SPARK_EXECUTION_SESSION_ENABLED));
registerNonDynamicSettings(
settingBuilder, clusterSettings, Key.CLUSTER_NAME, ClusterName.CLUSTER_NAME_SETTING);
defaultSettings = settingBuilder.build();
Expand Down Expand Up @@ -270,6 +283,7 @@ public static List<Setting<?>> pluginSettings() {
.add(METRICS_ROLLING_INTERVAL_SETTING)
.add(DATASOURCE_URI_HOSTS_DENY_LIST)
.add(SPARK_EXECUTION_ENGINE_CONFIG)
.add(SPARK_EXECUTION_SESSION_ENABLED_SETTING)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static org.opensearch.sql.common.setting.Settings.Key.SPARK_EXECUTION_ENGINE_CONFIG;
import static org.opensearch.sql.datasource.model.DataSourceMetadata.defaultOpenSearchDataSourceMetadata;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_REQUEST_BUFFER_INDEX_NAME;

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.emrserverless.AWSEMRServerless;
Expand Down Expand Up @@ -99,6 +100,8 @@
import org.opensearch.sql.spark.config.SparkExecutionEngineConfigSupplier;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfigSupplierImpl;
import org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher;
import org.opensearch.sql.spark.execution.session.SessionManager;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadataReaderImpl;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;
import org.opensearch.sql.spark.rest.RestAsyncQueryManagementAction;
Expand Down Expand Up @@ -318,7 +321,11 @@ private AsyncQueryExecutorService createAsyncQueryExecutorService(
new DataSourceUserAuthorizationHelperImpl(client),
jobExecutionResponseReader,
new FlintIndexMetadataReaderImpl(client),
client);
client,
new SessionManager(
new StateStore(SPARK_REQUEST_BUFFER_INDEX_NAME, client),
emrServerlessClient,
pluginSettings));
return new AsyncQueryExecutorServiceImpl(
asyncQueryJobMetadataStorageService,
sparkQueryDispatcher,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,17 @@ public CreateAsyncQueryResponse createAsyncQuery(
createAsyncQueryRequest.getLang(),
sparkExecutionEngineConfig.getExecutionRoleARN(),
sparkExecutionEngineConfig.getClusterName(),
sparkExecutionEngineConfig.getSparkSubmitParameters()));
sparkExecutionEngineConfig.getSparkSubmitParameters(),
createAsyncQueryRequest.getSessionId()));
asyncQueryJobMetadataStorageService.storeJobMetadata(
new AsyncQueryJobMetadata(
sparkExecutionEngineConfig.getApplicationId(),
dispatchQueryResponse.getJobId(),
dispatchQueryResponse.isDropIndexQuery(),
dispatchQueryResponse.getResultIndex()));
return new CreateAsyncQueryResponse(dispatchQueryResponse.getJobId());
dispatchQueryResponse.getResultIndex(),
dispatchQueryResponse.getSessionId()));
return new CreateAsyncQueryResponse(
dispatchQueryResponse.getJobId(), dispatchQueryResponse.getSessionId());
}

@Override
Expand All @@ -81,6 +84,7 @@ public AsyncQueryExecutionResponse getAsyncQueryResults(String queryId) {
Optional<AsyncQueryJobMetadata> jobMetadata =
asyncQueryJobMetadataStorageService.getJobMetadata(queryId);
if (jobMetadata.isPresent()) {
String sessionId = jobMetadata.get().getSessionId();
JSONObject jsonObject = sparkQueryDispatcher.getQueryResponse(jobMetadata.get());
if (JobRunState.SUCCESS.toString().equals(jsonObject.getString(STATUS_FIELD))) {
DefaultSparkSqlFunctionResponseHandle sparkSqlFunctionResponseHandle =
Expand All @@ -90,13 +94,18 @@ public AsyncQueryExecutionResponse getAsyncQueryResults(String queryId) {
result.add(sparkSqlFunctionResponseHandle.next());
}
return new AsyncQueryExecutionResponse(
JobRunState.SUCCESS.toString(), sparkSqlFunctionResponseHandle.schema(), result, null);
JobRunState.SUCCESS.toString(),
sparkSqlFunctionResponseHandle.schema(),
result,
null,
sessionId);
} else {
return new AsyncQueryExecutionResponse(
jsonObject.optString(STATUS_FIELD, JobRunState.FAILED.toString()),
null,
null,
jsonObject.optString(ERROR_FIELD, ""));
jsonObject.optString(ERROR_FIELD, ""),
sessionId);
}
}
throw new AsyncQueryNotFoundException(String.format("QueryId: %s not found", queryId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ public class AsyncQueryExecutionResponse {
private final ExecutionEngine.Schema schema;
private final List<ExprValue> results;
private final String error;
private final String sessionId;
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,15 @@
private String jobId;
private boolean isDropIndexQuery;
private String resultIndex;
// optional sessionId.
private String sessionId;

public AsyncQueryJobMetadata(String applicationId, String jobId, String resultIndex) {
this.applicationId = applicationId;
this.jobId = jobId;
this.isDropIndexQuery = false;
this.resultIndex = resultIndex;
this.sessionId = null;
}

@Override
Expand All @@ -57,6 +60,7 @@
builder.field("applicationId", metadata.getApplicationId());
builder.field("isDropIndexQuery", metadata.isDropIndexQuery());
builder.field("resultIndex", metadata.getResultIndex());
builder.field("sessionId", metadata.getSessionId());
builder.endObject();
return builder;
}
Expand Down Expand Up @@ -92,6 +96,7 @@
String applicationId = null;
boolean isDropIndexQuery = false;
String resultIndex = null;
String sessionId = null;
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String fieldName = parser.currentName();
Expand All @@ -109,13 +114,17 @@
case "resultIndex":
resultIndex = parser.textOrNull();
break;
case "sessionId":
sessionId = parser.textOrNull();
break;

Check warning on line 119 in spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/AsyncQueryJobMetadata.java

View check run for this annotation

Codecov / codecov/patch

spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/AsyncQueryJobMetadata.java#L118-L119

Added lines #L118 - L119 were not covered by tests
default:
throw new IllegalArgumentException("Unknown field: " + fieldName);
}
}
if (jobId == null || applicationId == null) {
throw new IllegalArgumentException("jobId and applicationId are required fields.");
}
return new AsyncQueryJobMetadata(applicationId, jobId, isDropIndexQuery, resultIndex);
return new AsyncQueryJobMetadata(
applicationId, jobId, isDropIndexQuery, resultIndex, sessionId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class SparkConstants {
public static final String SPARK_SQL_APPLICATION_JAR =
"file:///home/hadoop/.ivy2/jars/org.opensearch_opensearch-spark-sql-application_2.12-0.1.0-SNAPSHOT.jar";
public static final String SPARK_RESPONSE_BUFFER_INDEX_NAME = ".query_execution_result";
public static final String SPARK_REQUEST_BUFFER_INDEX_NAME = ".query_execution_request";
// TODO should be replaced with mvn jar.
public static final String FLINT_INTEGRATION_JAR =
"s3://spark-datasource/flint-spark-integration-assembly-0.1.0-SNAPSHOT.jar";
Expand Down
Loading
Loading