Skip to content

Commit

Permalink
Validate session with flint datasource passed in async job request (#…
Browse files Browse the repository at this point in the history
…2448) (#2450)

* Validate session with flint datasource passed in async job request

Currently, if there's a session running with Datasource1 and the user makes a request to the async API with the same session but a different catalog Datasource2 then SQL plugin doesn't return a new session for Datasource2. This PR creates a new session when there’s a mismatch between datasource and session_id.

Testing done:
1. manual testing
2. added IT.



* address comments



* add doc



---------


(cherry picked from commit 0176525)

Signed-off-by: Kaituo Li <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 4a1d7a3 commit 0f5b5ef
Show file tree
Hide file tree
Showing 19 changed files with 406 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ public enum Key {
SESSION_INDEX_TTL("plugins.query.executionengine.spark.session.index.ttl"),
RESULT_INDEX_TTL("plugins.query.executionengine.spark.result.index.ttl"),
AUTO_INDEX_MANAGEMENT_ENABLED(
"plugins.query.executionengine.spark.auto_index_management.enabled");
"plugins.query.executionengine.spark.auto_index_management.enabled"),
SESSION_INACTIVITY_TIMEOUT_MILLIS(
"plugins.query.executionengine.spark.session_inactivity_timeout_millis");

@Getter private final String keyValue;

Expand Down
145 changes: 145 additions & 0 deletions docs/user/admin/settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -418,3 +418,148 @@ SQL query::
}
}


plugins.query.executionengine.spark.session_inactivity_timeout_millis
===============================

Description
-----------

This setting determines the duration after which a session is considered stale if there has been no update. The default
timeout is 3 minutes (180,000 milliseconds).

1. Default Value: 180000 (milliseconds)
2. Scope: Node-level
3. Dynamic Update: Yes, this setting can be updated dynamically.

To change the session inactivity timeout to 10 minutes for example, use the following command:

SQL query::

sh$ curl -sS -H 'Content-Type: application/json' -X PUT localhost:9200/_cluster/settings \
... -d '{"transient":{"plugins.query.executionengine.spark.session_inactivity_timeout_millis":600000}}'
{
"acknowledged": true,
"persistent": {},
"transient": {
"plugins": {
"query": {
"executionengine": {
"spark": {
"session_inactivity_timeout_millis": "600000"
}
}
}
}
}
}


plugins.query.executionengine.spark.auto_index_management.enabled
===============================

Description
-----------
This setting controls the automatic management of request and result indices for each data source. When enabled, it
deletes outdated index documents.

* Default State: Enabled (true)
* Purpose: Manages auto index management for request and result indices.

To disable auto index management, use the following command:

SQL query::

sh$ curl -sS -H 'Content-Type: application/json' -X PUT localhost:9200/_cluster/settings \
... -d '{"transient":{"plugins.query.executionengine.spark.auto_index_management.enabled":false}}'
{
"acknowledged": true,
"persistent": {},
"transient": {
"plugins": {
"query": {
"executionengine": {
"spark": {
"auto_index_management": {
"enabled": "false"
}
}
}
}
}
}
}


plugins.query.executionengine.spark.session.index.ttl
===============================

Description
-----------
This setting defines the time-to-live (TTL) for request indices when plugins.query.executionengine.spark.auto_index_management.enabled
is true. By default, request indices older than 14 days are deleted.

* Default Value: 14 days

To change the TTL to 30 days for example, execute the following command:

SQL query::

sh$ curl -sS -H 'Content-Type: application/json' -X PUT localhost:9200/_cluster/settings \
... -d '{"transient":{"plugins.query.executionengine.spark.session.index.ttl":"30d"}}'
{
"acknowledged": true,
"persistent": {},
"transient": {
"plugins": {
"query": {
"executionengine": {
"spark": {
"session": {
"index": {
"ttl": "30d"
}
}
}
}
}
}
}
}


plugins.query.executionengine.spark.result.index.ttl
===============================

Description
-----------
This setting specifies the TTL for result indices when plugins.query.executionengine.spark.auto_index_management.enabled
is set to true. The default setting is to delete result indices older than 60 days.

* Default Value: 60 days

To modify the TTL to 30 days for example, use this command:

SQL query::

sh$ curl -sS -H 'Content-Type: application/json' -X PUT localhost:9200/_cluster/settings \
... -d '{"transient":{"plugins.query.executionengine.spark.result.index.ttl":"30d"}}'
{
"acknowledged": true,
"persistent": {},
"transient": {
"plugins": {
"query": {
"executionengine": {
"spark": {
"result": {
"index": {
"ttl": "30d"
}
}
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,13 @@ public class OpenSearchSettings extends Settings {
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<Long> SESSION_INACTIVITY_TIMEOUT_MILLIS_SETTING =
Setting.longSetting(
Key.SESSION_INACTIVITY_TIMEOUT_MILLIS.getKeyValue(),
180000L,
Setting.Property.NodeScope,
Setting.Property.Dynamic);

/** Construct OpenSearchSetting. The OpenSearchSetting must be singleton. */
@SuppressWarnings("unchecked")
public OpenSearchSettings(ClusterSettings clusterSettings) {
Expand Down Expand Up @@ -287,6 +294,12 @@ public OpenSearchSettings(ClusterSettings clusterSettings) {
new Updater(Key.DATASOURCES_LIMIT));
registerNonDynamicSettings(
settingBuilder, clusterSettings, Key.CLUSTER_NAME, ClusterName.CLUSTER_NAME_SETTING);
register(
settingBuilder,
clusterSettings,
Key.SESSION_INACTIVITY_TIMEOUT_MILLIS,
SESSION_INACTIVITY_TIMEOUT_MILLIS_SETTING,
new Updater((Key.SESSION_INACTIVITY_TIMEOUT_MILLIS)));
defaultSettings = settingBuilder.build();
}

Expand Down Expand Up @@ -356,6 +369,7 @@ public static List<Setting<?>> pluginSettings() {
.add(RESULT_INDEX_TTL_SETTING)
.add(AUTO_INDEX_MANAGEMENT_ENABLED_SETTING)
.add(DATASOURCES_LIMIT_SETTING)
.add(SESSION_INACTIVITY_TIMEOUT_MILLIS_SETTING)
.build();
}

Expand Down
9 changes: 9 additions & 0 deletions spark/src/main/antlr/FlintSparkSqlExtensions.g4
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ statement
: skippingIndexStatement
| coveringIndexStatement
| materializedViewStatement
| indexJobManagementStatement
;

skippingIndexStatement
Expand Down Expand Up @@ -109,6 +110,14 @@ dropMaterializedViewStatement
: DROP MATERIALIZED VIEW mvName=multipartIdentifier
;

indexJobManagementStatement
: recoverIndexJobStatement
;

recoverIndexJobStatement
: RECOVER INDEX JOB identifier
;

/*
* Match all remaining tokens in non-greedy way
* so WITH clause won't be captured by this rule.
Expand Down
2 changes: 2 additions & 0 deletions spark/src/main/antlr/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,12 @@ IF: 'IF';
IN: 'IN';
INDEX: 'INDEX';
INDEXES: 'INDEXES';
JOB: 'JOB';
MATERIALIZED: 'MATERIALIZED';
NOT: 'NOT';
ON: 'ON';
PARTITION: 'PARTITION';
RECOVER: 'RECOVER';
REFRESH: 'REFRESH';
SHOW: 'SHOW';
TRUE: 'TRUE';
Expand Down
1 change: 1 addition & 0 deletions spark/src/main/antlr/SqlBaseLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ VALUES: 'VALUES';
VARCHAR: 'VARCHAR';
VAR: 'VAR';
VARIABLE: 'VARIABLE';
VARIANT: 'VARIANT';
VERSION: 'VERSION';
VIEW: 'VIEW';
VIEWS: 'VIEWS';
Expand Down
16 changes: 14 additions & 2 deletions spark/src/main/antlr/SqlBaseParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,10 @@ replaceTableHeader
: (CREATE OR)? REPLACE TABLE identifierReference
;

clusterBySpec
: CLUSTER BY LEFT_PAREN multipartIdentifierList RIGHT_PAREN
;

bucketSpec
: CLUSTERED BY identifierList
(SORTED BY orderedIdentifierList)?
Expand Down Expand Up @@ -383,6 +387,7 @@ createTableClauses
:((OPTIONS options=expressionPropertyList) |
(PARTITIONED BY partitioning=partitionFieldList) |
skewSpec |
clusterBySpec |
bucketSpec |
rowFormat |
createFileFormat |
Expand Down Expand Up @@ -582,6 +587,10 @@ notMatchedBySourceAction
| UPDATE SET assignmentList
;

exceptClause
: EXCEPT LEFT_PAREN exceptCols=multipartIdentifierList RIGHT_PAREN
;

assignmentList
: assignment (COMMA assignment)*
;
Expand Down Expand Up @@ -964,8 +973,8 @@ primaryExpression
| LAST LEFT_PAREN expression (IGNORE NULLS)? RIGHT_PAREN #last
| POSITION LEFT_PAREN substr=valueExpression IN str=valueExpression RIGHT_PAREN #position
| constant #constantDefault
| ASTERISK #star
| qualifiedName DOT ASTERISK #star
| ASTERISK exceptClause? #star
| qualifiedName DOT ASTERISK exceptClause? #star
| LEFT_PAREN namedExpression (COMMA namedExpression)+ RIGHT_PAREN #rowConstructor
| LEFT_PAREN query RIGHT_PAREN #subqueryExpression
| functionName LEFT_PAREN (setQuantifier? argument+=functionArgument
Expand Down Expand Up @@ -1081,6 +1090,7 @@ type
| DECIMAL | DEC | NUMERIC
| VOID
| INTERVAL
| VARIANT
| ARRAY | STRUCT | MAP
| unsupportedType=identifier
;
Expand Down Expand Up @@ -1540,6 +1550,7 @@ ansiNonReserved
| VARCHAR
| VAR
| VARIABLE
| VARIANT
| VERSION
| VIEW
| VIEWS
Expand Down Expand Up @@ -1888,6 +1899,7 @@ nonReserved
| VARCHAR
| VAR
| VARIABLE
| VARIANT
| VERSION
| VIEW
| VIEWS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ public DispatchQueryResponse submit(
session = createdSession.get();
}
}
if (session == null || !session.isReady()) {
if (session == null
|| !session.isOperationalForDataSource(dispatchQueryRequest.getDatasource())) {
// create session if not exist or session dead/fail
tags.put(JOB_TYPE_TAG_KEY, JobType.INTERACTIVE.getText());
session =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.sql.spark.execution.statement.StatementId;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.rest.model.LangType;
import org.opensearch.sql.spark.utils.TimeProvider;

/**
* Interactive session.
Expand All @@ -42,6 +43,9 @@ public class InteractiveSession implements Session {
private final StateStore stateStore;
private final EMRServerlessClient serverlessClient;
private SessionModel sessionModel;
// the threshold of elapsed time in milliseconds before we say a session is stale
private long sessionInactivityTimeoutMilli;
private TimeProvider timeProvider;

@Override
public void open(CreateSessionRequest createSessionRequest) {
Expand Down Expand Up @@ -134,7 +138,14 @@ public Optional<Statement> get(StatementId stID) {
}

@Override
public boolean isReady() {
return sessionModel.getSessionState() != DEAD && sessionModel.getSessionState() != FAIL;
public boolean isOperationalForDataSource(String dataSourceName) {
boolean isSessionStateValid =
sessionModel.getSessionState() != DEAD && sessionModel.getSessionState() != FAIL;
boolean isDataSourceMatch = sessionId.getDataSourceName().equals(dataSourceName);
boolean isSessionUpdatedRecently =
timeProvider.currentEpochMillis() - sessionModel.getLastUpdateTime()
<= sessionInactivityTimeoutMilli;

return isSessionStateValid && isDataSourceMatch && isSessionUpdatedRecently;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,5 @@ public interface Session {
SessionId getSessionId();

/** return true if session is ready to use. */
boolean isReady();
boolean isOperationalForDataSource(String dataSourceName);
}
Loading

0 comments on commit 0f5b5ef

Please sign in to comment.