Skip to content

Commit

Permalink
Correctly Set query status (#2227) (#2231) (#2232)
Browse files Browse the repository at this point in the history
* Correctly Set query status

Previously, the query status was set to SUCCESS only when the EMR-S job status was 'SUCCESS'. However, for index queries, even when the EMR Job Run State is 'RUNNING', the result should indicate success. This PR addresses and resolves this inconsistency.

Tests done:
* Manual verification: Created a skipping index and confirmed the async query result is marked 'successful' instead of 'running'.
* Updated relevant unit tests.

* add unit test

---------

(cherry picked from commit cd9d768)




(cherry picked from commit 00ae8ce)

Signed-off-by: Kaituo Li <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 9aaf81d commit c2b6fbb
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 24 deletions.
15 changes: 11 additions & 4 deletions docs/user/ppl/admin/connectors/s3glue_connector.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,18 @@ S3Glue Connector
Introduction
============

Properties in DataSource Configuration

* name: A unique identifier for the data source within a domain.
* connector: Currently supports the following connectors: s3glue, spark, prometheus, and opensearch.
* resultIndex: Stores the results of queries executed on the data source. If unavailable, it defaults to .query_execution_result.

Glue Connector
========================================================

s3Glue connector provides a way to query s3 files using glue as metadata store and spark as execution engine.
This page covers s3Glue datasource configuration and also how to query and s3Glue datasource.


Required resources for s3 Glue Connector
===================================
* S3: This is where the data lies.
Expand All @@ -27,8 +35,6 @@ Required resources for s3 Glue Connector

We currently only support emr-serverless as spark execution engine and Glue as metadata store. we will add more support in future.

Glue Connector Properties in DataSource Configuration
========================================================
Glue Connector Properties.

* ``glue.auth.type`` [Required]
Expand Down Expand Up @@ -59,7 +65,8 @@ Glue datasource configuration::
"glue.indexstore.opensearch.auth" :"basicauth",
"glue.indexstore.opensearch.auth.username" :"username"
"glue.indexstore.opensearch.auth.password" :"password"
}
},
"resultIndex": "query_execution_result"
}]

[{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,34 +63,38 @@ public DispatchQueryResponse dispatch(DispatchQueryRequest dispatchQueryRequest)
}
}

// TODO : Fetch from Result Index and then make call to EMR Serverless.
public JSONObject getQueryResponse(AsyncQueryJobMetadata asyncQueryJobMetadata) {
GetJobRunResult getJobRunResult =
emrServerlessClient.getJobRunResult(
asyncQueryJobMetadata.getApplicationId(), asyncQueryJobMetadata.getJobId());
String jobState = getJobRunResult.getJobRun().getState();
// either empty json when the result is not available or data with status
// Fetch from Result Index
JSONObject result =
(jobState.equals(JobRunState.SUCCESS.toString()))
? jobExecutionResponseReader.getResultFromOpensearchIndex(
asyncQueryJobMetadata.getJobId(), asyncQueryJobMetadata.getResultIndex())
: new JSONObject();
jobExecutionResponseReader.getResultFromOpensearchIndex(
asyncQueryJobMetadata.getJobId(), asyncQueryJobMetadata.getResultIndex());

// if result index document has a status, we are gonna use the status directly; otherwise, we
// will use emr-s job status
// a job is successful does not mean there is no error in execution. For example, even if result
// index mapping
// is incorrect, we still write query result and let the job finish.
// will use emr-s job status.
// That a job is successful does not mean there is no error in execution. For example, even if
// result
// index mapping is incorrect, we still write query result and let the job finish.
// That a job is running does not mean the status is running. For example, index/streaming Query
// is a
// long-running job which runs forever. But we need to return success from the result index
// immediately.
if (result.has(DATA_FIELD)) {
JSONObject items = result.getJSONObject(DATA_FIELD);

// If items have STATUS_FIELD, use it; otherwise, use jobState
String status = items.optString(STATUS_FIELD, jobState);
// If items have STATUS_FIELD, use it; otherwise, mark failed
String status = items.optString(STATUS_FIELD, JobRunState.FAILED.toString());
result.put(STATUS_FIELD, status);

// If items have ERROR_FIELD, use it; otherwise, set empty string
String error = items.optString(ERROR_FIELD, "");
result.put(ERROR_FIELD, error);
} else {
// make call to EMR Serverless when related result index documents are not available
GetJobRunResult getJobRunResult =
emrServerlessClient.getJobRunResult(
asyncQueryJobMetadata.getApplicationId(), asyncQueryJobMetadata.getJobId());
String jobState = getJobRunResult.getJobRun().getState();
result.put(STATUS_FIELD, jobState);
result.put(ERROR_FIELD, "");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Client;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.SearchHit;
Expand Down Expand Up @@ -46,8 +47,14 @@ private JSONObject searchInSparkIndex(QueryBuilder query, String resultIndex) {
searchSourceBuilder.query(query);
searchRequest.source(searchSourceBuilder);
ActionFuture<SearchResponse> searchResponseActionFuture;
JSONObject data = new JSONObject();
try {
searchResponseActionFuture = client.search(searchRequest);
} catch (IndexNotFoundException e) {
// if there is no result index (e.g., EMR-S hasn't created the index yet), we return empty
// json
LOG.info(resultIndex + " is not created yet.");
return data;
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -59,7 +66,6 @@ private JSONObject searchInSparkIndex(QueryBuilder query, String resultIndex) {
+ " index failed with status : "
+ searchResponse.status());
} else {
JSONObject data = new JSONObject();
for (SearchHit searchHit : searchResponse.getHits().getHits()) {
data.put(DATA_FIELD, searchHit.getSourceAsMap());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -613,11 +613,14 @@ void testGetQueryResponse() {
flintIndexMetadataReader);
when(emrServerlessClient.getJobRunResult(EMRS_APPLICATION_ID, EMR_JOB_ID))
.thenReturn(new GetJobRunResult().withJobRun(new JobRun().withState(JobRunState.PENDING)));

// simulate result index is not created yet
when(jobExecutionResponseReader.getResultFromOpensearchIndex(EMR_JOB_ID, null))
.thenReturn(new JSONObject());
JSONObject result =
sparkQueryDispatcher.getQueryResponse(
new AsyncQueryJobMetadata(EMRS_APPLICATION_ID, EMR_JOB_ID, null));
Assertions.assertEquals("PENDING", result.get("status"));
verifyNoInteractions(jobExecutionResponseReader);
}

@Test
Expand All @@ -629,8 +632,6 @@ void testGetQueryResponseWithSuccess() {
dataSourceUserAuthorizationHelper,
jobExecutionResponseReader,
flintIndexMetadataReader);
when(emrServerlessClient.getJobRunResult(EMRS_APPLICATION_ID, EMR_JOB_ID))
.thenReturn(new GetJobRunResult().withJobRun(new JobRun().withState(JobRunState.SUCCESS)));
JSONObject queryResult = new JSONObject();
Map<String, Object> resultMap = new HashMap<>();
resultMap.put(STATUS_FIELD, "SUCCESS");
Expand All @@ -641,7 +642,6 @@ void testGetQueryResponseWithSuccess() {
JSONObject result =
sparkQueryDispatcher.getQueryResponse(
new AsyncQueryJobMetadata(EMRS_APPLICATION_ID, EMR_JOB_ID, null));
verify(emrServerlessClient, times(1)).getJobRunResult(EMRS_APPLICATION_ID, EMR_JOB_ID);
verify(jobExecutionResponseReader, times(1)).getResultFromOpensearchIndex(EMR_JOB_ID, null);
Assertions.assertEquals(
new HashSet<>(Arrays.asList(DATA_FIELD, STATUS_FIELD, ERROR_FIELD)), result.keySet());
Expand All @@ -655,6 +655,7 @@ void testGetQueryResponseWithSuccess() {
// We need similar.
Assertions.assertTrue(dataJson.similar(result.get(DATA_FIELD)));
Assertions.assertEquals("SUCCESS", result.get(STATUS_FIELD));
verifyNoInteractions(emrServerlessClient);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
import static org.opensearch.sql.spark.constants.TestConstants.EMR_JOB_ID;
Expand All @@ -24,6 +25,7 @@
import org.opensearch.client.Client;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;

Expand Down Expand Up @@ -91,4 +93,12 @@ public void testSearchFailure() {
RuntimeException.class,
() -> jobExecutionResponseReader.getResultFromOpensearchIndex(EMR_JOB_ID, null));
}

@Test
public void testIndexNotFoundException() {
when(client.search(any())).thenThrow(IndexNotFoundException.class);
JobExecutionResponseReader jobExecutionResponseReader = new JobExecutionResponseReader(client);
assertTrue(
jobExecutionResponseReader.getResultFromOpensearchIndex(EMR_JOB_ID, "foo").isEmpty());
}
}

0 comments on commit c2b6fbb

Please sign in to comment.