forked from opensearch-project/opensearch-spark
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Fix Session state bug and improve Query Efficiency in REPL
This PR introduces a bug fix and enhancements to FlintREPL's session management and optimizes query execution methods. It addresses a specific issue where marking a session as 'dead' inadvertently triggered the creation of a new, unnecessary session. This behavior resulted in the new session entering a spin-wait state, leading to duplicate jobs. The improvements include: - **Introduction of `earlyExitFlag`**: A new flag, `earlyExitFlag`, has been introduced and is set to `true` under two conditions: when a job is excluded or when it is not associated with the current session's job run ID. This flag is evaluated in the shutdown hook to determine whether the session state should be marked as 'dead'. This change effectively prevents the unintended creation of duplicate sessions by the SQL plugin, ensuring resources are utilized more efficiently. - **Query Method Optimization**: The method for executing queries has been shifted from scrolling to search, eliminating the need for creating unnecessary scroll contexts. This adjustment enhances the performance and efficiency of query operations. - **Reversion of Previous Commit**: The PR reverts a previous change (opensearch-project@be82024) following the resolution of the related issue in the SQL plugin (opensearch-project/sql#2436), further streamlining the operation and maintenance of the system. **Testing**: 1. Integration tests were added to cover both REPL and streaming job functionalities, ensuring the robustness of the fixes. 2. Manual testing was conducted to validate the bug fix. Signed-off-by: Kaituo Li <[email protected]>
- Loading branch information
Showing
22 changed files
with
1,213 additions
and
51 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
48 changes: 48 additions & 0 deletions
48
flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchQueryReader.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.flint.core.storage; | ||
|
||
import org.opensearch.OpenSearchStatusException; | ||
import org.opensearch.action.search.ClearScrollRequest; | ||
import org.opensearch.action.search.SearchRequest; | ||
import org.opensearch.action.search.SearchResponse; | ||
import org.opensearch.action.search.SearchScrollRequest; | ||
import org.opensearch.client.RequestOptions; | ||
import org.opensearch.client.RestHighLevelClient; | ||
import org.opensearch.common.Strings; | ||
import org.opensearch.common.unit.TimeValue; | ||
import org.opensearch.flint.core.FlintOptions; | ||
import org.opensearch.flint.core.IRestHighLevelClient; | ||
import org.opensearch.search.builder.SearchSourceBuilder; | ||
|
||
import java.io.IOException; | ||
import java.util.Optional; | ||
import java.util.logging.Level; | ||
import java.util.logging.Logger; | ||
|
||
/** | ||
* {@link OpenSearchReader} using search. https://opensearch.org/docs/latest/api-reference/search/ | ||
*/ | ||
public class OpenSearchQueryReader extends OpenSearchReader { | ||
|
||
private static final Logger LOG = Logger.getLogger(OpenSearchQueryReader.class.getName()); | ||
|
||
public OpenSearchQueryReader(IRestHighLevelClient client, String indexName, SearchSourceBuilder searchSourceBuilder) { | ||
super(client, new SearchRequest().indices(indexName).source(searchSourceBuilder)); | ||
} | ||
|
||
/** | ||
* search. | ||
*/ | ||
Optional<SearchResponse> search(SearchRequest request) throws IOException { | ||
return Optional.of(client.search(request, RequestOptions.DEFAULT)); | ||
} | ||
|
||
/** | ||
* nothing to clean | ||
*/ | ||
void clean() throws IOException {} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.