forked from opensearch-project/sql
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: Peng Huo <[email protected]>
- Loading branch information
Showing
23 changed files
with
758 additions
and
181 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
126 changes: 126 additions & 0 deletions
126
spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.sql.spark.dispatcher; | ||
|
||
import static org.opensearch.sql.spark.execution.statestore.StateStore.createIndexDMLResult; | ||
|
||
import com.amazonaws.services.emrserverless.model.JobRunState; | ||
import lombok.RequiredArgsConstructor; | ||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.json.JSONObject; | ||
import org.opensearch.client.Client; | ||
import org.opensearch.sql.datasource.DataSourceService; | ||
import org.opensearch.sql.datasource.model.DataSourceMetadata; | ||
import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl; | ||
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId; | ||
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata; | ||
import org.opensearch.sql.spark.client.EMRServerlessClient; | ||
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest; | ||
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse; | ||
import org.opensearch.sql.spark.dispatcher.model.IndexDMLResult; | ||
import org.opensearch.sql.spark.dispatcher.model.IndexDetails; | ||
import org.opensearch.sql.spark.execution.statestore.StateStore; | ||
import org.opensearch.sql.spark.flint.FlintIndexMetadata; | ||
import org.opensearch.sql.spark.flint.FlintIndexMetadataReader; | ||
import org.opensearch.sql.spark.flint.operation.FlintIndexOp; | ||
import org.opensearch.sql.spark.flint.operation.FlintIndexOpCancel; | ||
import org.opensearch.sql.spark.flint.operation.FlintIndexOpDelete; | ||
import org.opensearch.sql.spark.response.JobExecutionResponseReader; | ||
|
||
/** | ||
* Handle Index DML query. includes | ||
* * DROP | ||
* * ALT? | ||
*/ | ||
@RequiredArgsConstructor | ||
public class IndexDMLHandler extends AsyncQueryHandler { | ||
private static final Logger LOG = LogManager.getLogger(); | ||
|
||
public static final String DROP_INDEX_JOB_ID = "dropIndexJobId"; | ||
|
||
private final EMRServerlessClient emrServerlessClient; | ||
|
||
private final DataSourceService dataSourceService; | ||
|
||
private final DataSourceUserAuthorizationHelperImpl dataSourceUserAuthorizationHelper; | ||
|
||
private final JobExecutionResponseReader jobExecutionResponseReader; | ||
|
||
private final FlintIndexMetadataReader flintIndexMetadataReader; | ||
|
||
private final Client client; | ||
|
||
private final StateStore stateStore; | ||
|
||
public static boolean isIndexDMLQuery(String jobId) { | ||
return DROP_INDEX_JOB_ID.equalsIgnoreCase(jobId); | ||
} | ||
|
||
public DispatchQueryResponse handle( | ||
DispatchQueryRequest dispatchQueryRequest, IndexDetails indexDetails) { | ||
DataSourceMetadata dataSourceMetadata = | ||
dataSourceService.getRawDataSourceMetadata(dispatchQueryRequest.getDatasource()); | ||
dataSourceUserAuthorizationHelper.authorizeDataSource(dataSourceMetadata); | ||
FlintIndexMetadata indexMetadata = flintIndexMetadataReader.getFlintIndexMetadata(indexDetails); | ||
// if index is created without auto refresh. there is no job to cancel. | ||
String status = JobRunState.FAILED.toString(); | ||
StringBuilder errorBuilder = new StringBuilder(); | ||
long startTime = 0L; | ||
try { | ||
FlintIndexOp jobCancelOp = | ||
new FlintIndexOpCancel(stateStore, dispatchQueryRequest.getDatasource(), | ||
emrServerlessClient, | ||
dispatchQueryRequest.getApplicationId(), indexMetadata.getJobId()); | ||
jobCancelOp.apply(indexMetadata); | ||
} catch (Exception e) { | ||
errorBuilder.append(e.getMessage()); | ||
errorBuilder.append("\n"); | ||
LOG.error(e); | ||
} | ||
try { | ||
FlintIndexOp indexDeleteOp = | ||
new FlintIndexOpDelete(stateStore, dispatchQueryRequest.getDatasource(), | ||
client, indexDetails); | ||
indexDeleteOp.apply(indexMetadata); | ||
status = JobRunState.SUCCESS.toString(); | ||
} catch (Exception e) { | ||
errorBuilder.append(e.getMessage()); | ||
errorBuilder.append("\n"); | ||
LOG.error(e); | ||
} | ||
|
||
AsyncQueryId asyncQueryId = AsyncQueryId.newAsyncQueryId(dataSourceMetadata.getName()); | ||
IndexDMLResult indexDMLResult = | ||
new IndexDMLResult(asyncQueryId.getId(), status, errorBuilder.toString(), | ||
dispatchQueryRequest.getDatasource(), System.currentTimeMillis() - startTime, | ||
System.currentTimeMillis()); | ||
createIndexDMLResult(stateStore, dataSourceMetadata.getResultIndex()).apply(indexDMLResult); | ||
|
||
return new DispatchQueryResponse( | ||
asyncQueryId, | ||
DROP_INDEX_JOB_ID, | ||
dataSourceMetadata.getResultIndex(), | ||
null); | ||
} | ||
|
||
@Override | ||
protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata) { | ||
String queryId = asyncQueryJobMetadata.getQueryId().getId(); | ||
return jobExecutionResponseReader.getResultWithQueryId( | ||
queryId, asyncQueryJobMetadata.getResultIndex()); | ||
} | ||
|
||
@Override | ||
protected JSONObject getResponseFromExecutor(AsyncQueryJobMetadata asyncQueryJobMetadata) { | ||
throw new IllegalStateException("[BUG] can't fetch result of index DML query form server"); | ||
} | ||
|
||
@Override | ||
String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) { | ||
throw new IllegalArgumentException("can't cancel index DML query"); | ||
} | ||
} |
Oops, something went wrong.