Skip to content

Commit

Permalink
[Backport 2.x] Add pit for pagination query (#3076)
Browse files Browse the repository at this point in the history
* Add pit for join queries (#2703)

* Add search after for join



* Enable search after by default



* Add pit



* nit



* Fix tests



* ignore joinWithGeoIntersectNL



* Rerun CI with scroll



* Remove unused code and retrigger CI with search_after true



* Address comments



* Remove unused code change



* Update pit keep alive time with SQL_CURSOR_KEEP_ALIVE



* Fix scroll condition



* nit



* Add pit before query execution



* nit



* Move pit from join request builder to executor



* Remove unused methods



* Add pit in parent class's run()



* Add comment for fetching subsequent result in NestedLoopsElasticExecutor



* Update comment



* Add javadoc for pit handler



* Add pit interface



* Add pit handler unit test



* Fix failed unit test CI



* Fix spotless error



* Rename pit class and add logs



* Fix pit delete unit test



---------



* Add pit for multi query (#2753)

* Add search after for join



* Enable search after by default



* Add pit



* nit



* Fix tests



* ignore joinWithGeoIntersectNL



* Rerun CI with scroll



* draft



* Remove unused code and retrigger CI with search_after true



* Address comments



* Remove unused code change



* Update pit keep alive time with SQL_CURSOR_KEEP_ALIVE



* Fix scroll condition



* nit



* Add pit before query execution



* Refactor get response with pit method



* Update remaining scroll search calls



* Fix integ test failures



* nit



* Move pit from join request builder to executor



* Remove unused methods



* Move pit from request to executor



* Fix pit.delete call missed while merge



* Move getResponseWithHits method to util class



* add try catch for create delete pit in minus executor



* move all common fields to ElasticHitsExecutor



* add javadoc for ElasticHitsExecutor



* Add missing javadoc



* Forcing an empty commit as last commit is stuck processing updates



---------



* Add pit to default cursor



* Run CI without pit unit test



* Rerun CI without pit unit test



* FIx unit tests for PIT changes



* Addressed comments



---------




(cherry picked from commit 69853fe)

Signed-off-by: Rupal Mahajan <[email protected]>
Signed-off-by: Manasvini B S <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: Rupal Mahajan <[email protected]>
  • Loading branch information
3 people authored Oct 18, 2024
1 parent 66f5ae8 commit 69bacd1
Show file tree
Hide file tree
Showing 12 changed files with 513 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,19 @@

package org.opensearch.sql.legacy.cursor;

import static org.opensearch.core.xcontent.DeprecationHandler.IGNORE_DEPRECATIONS;
import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -18,6 +29,16 @@
import lombok.Setter;
import org.json.JSONArray;
import org.json.JSONObject;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.search.SearchModule;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
import org.opensearch.sql.legacy.executor.format.Schema;

/**
Expand All @@ -40,6 +61,10 @@ public class DefaultCursor implements Cursor {
private static final String SCROLL_ID = "s";
private static final String SCHEMA_COLUMNS = "c";
private static final String FIELD_ALIAS_MAP = "a";
private static final String PIT_ID = "p";
private static final String SEARCH_REQUEST = "r";
private static final String SORT_FIELDS = "h";
private static final ObjectMapper objectMapper = new ObjectMapper();

/**
* To get mappings for index to check if type is date needed for
Expand Down Expand Up @@ -70,31 +95,85 @@ public class DefaultCursor implements Cursor {
/** To get next batch of result */
private String scrollId;

/** To get Point In Time */
private String pitId;

/** To get next batch of result with search after api */
private SearchSourceBuilder searchSourceBuilder;

/** To get last sort values * */
private Object[] sortFields;

/** To reduce the number of rows left by fetchSize */
@NonNull private Integer fetchSize;

private Integer limit;

/**
* {@link NamedXContentRegistry} from {@link SearchModule} used for construct {@link QueryBuilder}
* from DSL query string.
*/
private static final NamedXContentRegistry xContentRegistry =
new NamedXContentRegistry(
new SearchModule(Settings.EMPTY, Collections.emptyList()).getNamedXContents());

@Override
public CursorType getType() {
return type;
}

@Override
public String generateCursorId() {
if (rowsLeft <= 0 || Strings.isNullOrEmpty(scrollId)) {
if (rowsLeft <= 0 || isCursorIdNullOrEmpty()) {
return null;
}
JSONObject json = new JSONObject();
json.put(FETCH_SIZE, fetchSize);
json.put(ROWS_LEFT, rowsLeft);
json.put(INDEX_PATTERN, indexPattern);
json.put(SCROLL_ID, scrollId);
json.put(SCHEMA_COLUMNS, getSchemaAsJson());
json.put(FIELD_ALIAS_MAP, fieldAliasMap);
if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) {
json.put(PIT_ID, pitId);
String sortFieldValue =
AccessController.doPrivileged(
(PrivilegedAction<String>)
() -> {
try {
return objectMapper.writeValueAsString(sortFields);
} catch (JsonProcessingException e) {
throw new RuntimeException(
"Failed to parse sort fields from JSON string.", e);
}
});
json.put(SORT_FIELDS, sortFieldValue);
setSearchRequestString(json, searchSourceBuilder);
} else {
json.put(SCROLL_ID, scrollId);
}
return String.format("%s:%s", type.getId(), encodeCursor(json));
}

private void setSearchRequestString(JSONObject cursorJson, SearchSourceBuilder sourceBuilder) {
try {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
XContentBuilder builder = XContentFactory.jsonBuilder(outputStream);
sourceBuilder.toXContent(builder, null);
builder.close();

String searchRequestBase64 = Base64.getEncoder().encodeToString(outputStream.toByteArray());
cursorJson.put("searchSourceBuilder", searchRequestBase64);
} catch (IOException ex) {
throw new RuntimeException("Failed to set search request string on cursor json.", ex);
}
}

private boolean isCursorIdNullOrEmpty() {
return LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)
? Strings.isNullOrEmpty(pitId)
: Strings.isNullOrEmpty(scrollId);
}

public static DefaultCursor from(String cursorId) {
/**
* It is assumed that cursorId here is the second part of the original cursor passed by the
Expand All @@ -105,13 +184,50 @@ public static DefaultCursor from(String cursorId) {
cursor.setFetchSize(json.getInt(FETCH_SIZE));
cursor.setRowsLeft(json.getLong(ROWS_LEFT));
cursor.setIndexPattern(json.getString(INDEX_PATTERN));
cursor.setScrollId(json.getString(SCROLL_ID));
if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) {
populateCursorForPit(json, cursor);
} else {
cursor.setScrollId(json.getString(SCROLL_ID));
}
cursor.setColumns(getColumnsFromSchema(json.getJSONArray(SCHEMA_COLUMNS)));
cursor.setFieldAliasMap(fieldAliasMap(json.getJSONObject(FIELD_ALIAS_MAP)));

return cursor;
}

private static void populateCursorForPit(JSONObject json, DefaultCursor cursor) {
cursor.setPitId(json.getString(PIT_ID));

cursor.setSortFields(getSortFieldsFromJson(json));

// Retrieve and set the SearchSourceBuilder from the JSON field
String searchSourceBuilderBase64 = json.getString("searchSourceBuilder");
byte[] bytes = Base64.getDecoder().decode(searchSourceBuilderBase64);
ByteArrayInputStream streamInput = new ByteArrayInputStream(bytes);
try {
XContentParser parser =
XContentType.JSON
.xContent()
.createParser(xContentRegistry, IGNORE_DEPRECATIONS, streamInput);
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.fromXContent(parser);
cursor.setSearchSourceBuilder(sourceBuilder);
} catch (IOException ex) {
throw new RuntimeException("Failed to get searchSourceBuilder from cursor Id", ex);
}
}

private static Object[] getSortFieldsFromJson(JSONObject json) {
return AccessController.doPrivileged(
(PrivilegedAction<Object[]>)
() -> {
try {
return objectMapper.readValue(json.getString(SORT_FIELDS), Object[].class);
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to parse sort fields from JSON string.", e);
}
});
}

private JSONArray getSchemaAsJson() {
JSONArray schemaJson = new JSONArray();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.sql.legacy.executor.cursor;

import static org.opensearch.core.rest.RestStatus.OK;
import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER;

import java.util.Map;
import org.apache.logging.log4j.LogManager;
Expand All @@ -18,8 +19,11 @@
import org.opensearch.rest.RestChannel;
import org.opensearch.sql.legacy.cursor.CursorType;
import org.opensearch.sql.legacy.cursor.DefaultCursor;
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.metrics.Metrics;
import org.opensearch.sql.legacy.pit.PointInTimeHandler;
import org.opensearch.sql.legacy.pit.PointInTimeHandlerImpl;
import org.opensearch.sql.legacy.rewriter.matchtoterm.VerificationException;

public class CursorCloseExecutor implements CursorRestExecutor {
Expand Down Expand Up @@ -79,14 +83,26 @@ public String execute(Client client, Map<String, String> params) throws Exceptio
}

private String handleDefaultCursorCloseRequest(Client client, DefaultCursor cursor) {
String scrollId = cursor.getScrollId();
ClearScrollResponse clearScrollResponse =
client.prepareClearScroll().addScrollId(scrollId).get();
if (clearScrollResponse.isSucceeded()) {
return SUCCEEDED_TRUE;
if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) {
String pitId = cursor.getPitId();
PointInTimeHandler pit = new PointInTimeHandlerImpl(client, pitId);
try {
pit.delete();
return SUCCEEDED_TRUE;
} catch (RuntimeException e) {
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
return SUCCEEDED_FALSE;
}
} else {
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
return SUCCEEDED_FALSE;
String scrollId = cursor.getScrollId();
ClearScrollResponse clearScrollResponse =
client.prepareClearScroll().addScrollId(scrollId).get();
if (clearScrollResponse.isSucceeded()) {
return SUCCEEDED_TRUE;
} else {
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
return SUCCEEDED_FALSE;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package org.opensearch.sql.legacy.executor.cursor;

import static org.opensearch.core.rest.RestStatus.OK;
import static org.opensearch.sql.common.setting.Settings.Key.SQL_CURSOR_KEEP_ALIVE;
import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER;

import java.util.Arrays;
import java.util.Map;
Expand All @@ -14,21 +16,25 @@
import org.json.JSONException;
import org.opensearch.OpenSearchException;
import org.opensearch.action.search.ClearScrollResponse;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Client;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestChannel;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.search.builder.PointInTimeBuilder;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.sql.legacy.cursor.CursorType;
import org.opensearch.sql.legacy.cursor.DefaultCursor;
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
import org.opensearch.sql.legacy.executor.Format;
import org.opensearch.sql.legacy.executor.format.Protocol;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.metrics.Metrics;
import org.opensearch.sql.legacy.pit.PointInTimeHandler;
import org.opensearch.sql.legacy.pit.PointInTimeHandlerImpl;
import org.opensearch.sql.legacy.rewriter.matchtoterm.VerificationException;

public class CursorResultExecutor implements CursorRestExecutor {
Expand Down Expand Up @@ -91,14 +97,27 @@ public String execute(Client client, Map<String, String> params) throws Exceptio
}

private String handleDefaultCursorRequest(Client client, DefaultCursor cursor) {
String previousScrollId = cursor.getScrollId();
LocalClusterState clusterState = LocalClusterState.state();
TimeValue scrollTimeout = clusterState.getSettingValue(Settings.Key.SQL_CURSOR_KEEP_ALIVE);
SearchResponse scrollResponse =
client.prepareSearchScroll(previousScrollId).setScroll(scrollTimeout).get();
TimeValue paginationTimeout = clusterState.getSettingValue(SQL_CURSOR_KEEP_ALIVE);

SearchResponse scrollResponse = null;
if (clusterState.getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) {
String pitId = cursor.getPitId();
SearchSourceBuilder source = cursor.getSearchSourceBuilder();
source.searchAfter(cursor.getSortFields());
source.pointInTimeBuilder(new PointInTimeBuilder(pitId));
SearchRequest searchRequest = new SearchRequest();
searchRequest.source(source);
scrollResponse = client.search(searchRequest).actionGet();
} else {
String previousScrollId = cursor.getScrollId();
scrollResponse =
client.prepareSearchScroll(previousScrollId).setScroll(paginationTimeout).get();
}
SearchHits searchHits = scrollResponse.getHits();
SearchHit[] searchHitArray = searchHits.getHits();
String newScrollId = scrollResponse.getScrollId();
String newPitId = scrollResponse.pointInTimeId();

int rowsLeft = (int) cursor.getRowsLeft();
int fetch = cursor.getFetchSize();
Expand All @@ -120,16 +139,37 @@ private String handleDefaultCursorRequest(Client client, DefaultCursor cursor) {

if (rowsLeft <= 0) {
/** Clear the scroll context on last page */
ClearScrollResponse clearScrollResponse =
client.prepareClearScroll().addScrollId(newScrollId).get();
if (!clearScrollResponse.isSucceeded()) {
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
LOG.info("Error closing the cursor context {} ", newScrollId);
if (newScrollId != null) {
ClearScrollResponse clearScrollResponse =
client.prepareClearScroll().addScrollId(newScrollId).get();
if (!clearScrollResponse.isSucceeded()) {
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
LOG.info("Error closing the cursor context {} ", newScrollId);
}
}
if (newPitId != null) {
PointInTimeHandler pit = new PointInTimeHandlerImpl(client, newPitId);
try {
pit.delete();
} catch (RuntimeException e) {
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
LOG.info("Error deleting point in time {} ", newPitId);
}
}
}

cursor.setRowsLeft(rowsLeft);
cursor.setScrollId(newScrollId);
if (clusterState.getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) {
cursor.setPitId(newPitId);
cursor.setSearchSourceBuilder(cursor.getSearchSourceBuilder());
cursor.setSortFields(
scrollResponse
.getHits()
.getAt(scrollResponse.getHits().getHits().length - 1)
.getSortValues());
} else {
cursor.setScrollId(newScrollId);
}
Protocol protocol = new Protocol(client, searchHits, format.name().toLowerCase(), cursor);
return protocol.cursorFormat();
}
Expand Down
Loading

0 comments on commit 69bacd1

Please sign in to comment.