From be8fe0a0ce562fe732fae12a0b236b5731e4638c Mon Sep 17 00:00:00 2001 From: Costin Leau Date: Fri, 16 Aug 2019 15:39:36 +0300 Subject: [PATCH] SQL: Refactor away the cycle between Rowset and Cursor (#45516) * SQL: Refactor away the cycle between RowSet and Cursor Improve encapsulation of pagination of rowsets by breaking the cycle between cursor and associated rowset implementation, all logic now residing inside each cursor implementation. --- .../xpack/sql/execution/PlanExecutor.java | 7 +- .../search/CompositeAggregationCursor.java | 60 +++++--- .../execution/search/CompositeAggsRowSet.java | 21 ++- .../execution/search/PagingListRowSet.java | 46 ------ .../xpack/sql/execution/search/Querier.java | 133 +++++++----------- .../search/SchemaCompositeAggsRowSet.java | 7 +- .../search/SchemaSearchHitRowSet.java | 6 +- .../sql/execution/search/ScrollCursor.java | 54 +++++-- .../sql/execution/search/SearchHitRowSet.java | 38 +++-- .../xpack/sql/plan/logical/LocalRelation.java | 6 +- .../sql/plan/logical/command/Command.java | 8 ++ .../xpack/sql/plan/logical/command/Debug.java | 10 +- .../sql/plan/logical/command/Explain.java | 40 +++--- .../sql/plan/logical/command/ShowColumns.java | 7 +- .../plan/logical/command/ShowFunctions.java | 9 +- .../sql/plan/logical/command/ShowSchemas.java | 8 +- .../sql/plan/logical/command/ShowTables.java | 7 +- .../plan/logical/command/sys/SysColumns.java | 10 +- .../plan/logical/command/sys/SysTables.java | 12 +- .../plan/logical/command/sys/SysTypes.java | 7 +- .../xpack/sql/plan/physical/CommandExec.java | 10 +- .../xpack/sql/plan/physical/EsQueryExec.java | 4 +- .../xpack/sql/plan/physical/LocalExec.java | 6 +- .../xpack/sql/plan/physical/Unexecutable.java | 5 +- .../xpack/sql/plugin/TextFormatterCursor.java | 3 +- .../sql/plugin/TransportSqlQueryAction.java | 26 ++-- .../xpack/sql/session/Cursor.java | 25 +++- .../xpack/sql/session/Cursors.java | 3 +- .../xpack/sql/session/EmptyCursor.java | 5 +- .../xpack/sql/session/EmptyExecutable.java | 5 +- .../xpack/sql/session/EmptyRowSet.java | 5 - .../xpack/sql/session/Executable.java | 7 +- .../ListCursor.java} | 38 +++-- .../xpack/sql/session/ListRowSet.java | 17 ++- .../xpack/sql/session/RowSet.java | 5 - .../sql/session/SingletonExecutable.java | 10 +- .../xpack/sql/session/SingletonRowSet.java | 5 - .../xpack/sql/session/SqlSession.java | 3 +- .../logical/command/sys/SysColumnsTests.java | 2 +- .../logical/command/sys/SysTablesTests.java | 2 +- .../logical/command/sys/SysTypesTests.java | 20 ++- .../ListCursorTests.java} | 27 ++-- 42 files changed, 377 insertions(+), 352 deletions(-) delete mode 100644 x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/PagingListRowSet.java rename x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/{execution/search/PagingListCursor.java => session/ListCursor.java} (64%) rename x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/{execution/search/PagingListCursorTests.java => session/ListCursorTests.java} (63%) diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/PlanExecutor.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/PlanExecutor.java index 815c85b7fed8b..ca394bf11d881 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/PlanExecutor.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/PlanExecutor.java @@ -23,8 +23,7 @@ import org.elasticsearch.xpack.sql.proto.SqlTypedParamValue; import org.elasticsearch.xpack.sql.session.Configuration; import org.elasticsearch.xpack.sql.session.Cursor; -import org.elasticsearch.xpack.sql.session.RowSet; -import org.elasticsearch.xpack.sql.session.SchemaRowSet; +import org.elasticsearch.xpack.sql.session.Cursor.Page; import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.stats.Metrics; import org.elasticsearch.xpack.sql.stats.QueryMetric; @@ -91,7 +90,7 @@ public void searchSource(Configuration cfg, String sql, List }, listener::onFailure)); } - public void sql(Configuration cfg, String sql, List params, ActionListener listener) { + public void sql(Configuration cfg, String sql, List params, ActionListener listener) { QueryMetric metric = QueryMetric.from(cfg.mode(), cfg.clientId()); metrics.total(metric); @@ -101,7 +100,7 @@ public void sql(Configuration cfg, String sql, List params, })); } - public void nextPage(Configuration cfg, Cursor cursor, ActionListener listener) { + public void nextPage(Configuration cfg, Cursor cursor, ActionListener listener) { QueryMetric metric = QueryMetric.from(cfg.mode(), cfg.clientId()); metrics.total(metric); metrics.paging(metric); diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursor.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursor.java index 26c1a690a4000..76c05c4f6a1b7 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursor.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursor.java @@ -27,7 +27,8 @@ import org.elasticsearch.xpack.sql.querydsl.agg.Aggs; import org.elasticsearch.xpack.sql.session.Configuration; import org.elasticsearch.xpack.sql.session.Cursor; -import org.elasticsearch.xpack.sql.session.RowSet; +import org.elasticsearch.xpack.sql.session.Rows; +import org.elasticsearch.xpack.sql.type.Schema; import org.elasticsearch.xpack.sql.util.StringUtils; import java.io.IOException; @@ -36,6 +37,8 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.Consumer; +import java.util.function.Function; /** * Cursor for composite aggregation (GROUP BY). @@ -116,7 +119,7 @@ boolean includeFrozen() { } @Override - public void nextPage(Configuration cfg, Client client, NamedWriteableRegistry registry, ActionListener listener) { + public void nextPage(Configuration cfg, Client client, NamedWriteableRegistry registry, ActionListener listener) { SearchSourceBuilder q; try { q = deserializeQuery(registry, nextQuery); @@ -135,21 +138,11 @@ public void nextPage(Configuration cfg, Client client, NamedWriteableRegistry re client.search(search, new ActionListener<>() { @Override public void onResponse(SearchResponse r) { - try { - // retry - if (shouldRetryDueToEmptyPage(r)) { - CompositeAggregationCursor.updateCompositeAfterKey(r, search.source()); - client.search(search, this); - return; - } - - boolean hasAfterKey = updateCompositeAfterKey(r, query); - CompositeAggsRowSet rowSet = new CompositeAggsRowSet(extractors, mask, r, limit, - hasAfterKey ? serializeQuery(query) : null, includeFrozen, indices); - listener.onResponse(rowSet); - } catch (Exception ex) { - listener.onFailure(ex); - } + handle(r, search.source(), ba -> new CompositeAggsRowSet(extractors, mask, r, limit, ba), + () -> client.search(search, this), + p -> listener.onResponse(p), + e -> listener.onFailure(e), + Schema.EMPTY, includeFrozen, indices); } @Override @@ -159,6 +152,39 @@ public void onFailure(Exception ex) { }); } + static void handle(SearchResponse response, SearchSourceBuilder source, Function makeRowSet, + Runnable retry, Consumer onPage, Consumer onFailure, + Schema schema, boolean includeFrozen, String[] indices) { + + // there are some results + if (response.getAggregations().asList().isEmpty() == false) { + // retry + if (CompositeAggregationCursor.shouldRetryDueToEmptyPage(response)) { + CompositeAggregationCursor.updateCompositeAfterKey(response, source); + retry.run(); + return; + } + + try { + boolean hasAfterKey = updateCompositeAfterKey(response, source); + byte[] queryAsBytes = hasAfterKey ? serializeQuery(source) : null; + CompositeAggsRowSet rowSet = makeRowSet.apply(queryAsBytes); + + Cursor next = rowSet.remainingData() == 0 + ? Cursor.EMPTY + : new CompositeAggregationCursor(queryAsBytes, rowSet.extractors(), rowSet.mask(), + rowSet.remainingData(), includeFrozen, indices); + onPage.accept(new Page(rowSet, next)); + } catch (Exception ex) { + onFailure.accept(ex); + } + } + // no results + else { + onPage.accept(Page.last(Rows.empty(schema))); + } + } + static boolean shouldRetryDueToEmptyPage(SearchResponse response) { CompositeAggregation composite = getComposite(response); // if there are no buckets but a next page, go fetch it instead of sending an empty response to the client diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggsRowSet.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggsRowSet.java index 88b93359d0f77..dd6b85279cb2d 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggsRowSet.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggsRowSet.java @@ -8,7 +8,6 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; import org.elasticsearch.xpack.sql.execution.search.extractor.BucketExtractor; -import org.elasticsearch.xpack.sql.session.Cursor; import org.elasticsearch.xpack.sql.session.RowSet; import java.util.BitSet; @@ -22,14 +21,11 @@ class CompositeAggsRowSet extends ResultRowSet { private final List buckets; - - private final Cursor cursor; - + private final int remainingData; private final int size; private int row = 0; - CompositeAggsRowSet(List exts, BitSet mask, SearchResponse response, - int limit, byte[] next, boolean includeFrozen, String... indices) { + CompositeAggsRowSet(List exts, BitSet mask, SearchResponse response, int limit, byte[] next) { super(exts, mask); CompositeAggregation composite = CompositeAggregationCursor.getComposite(response); @@ -43,7 +39,7 @@ class CompositeAggsRowSet extends ResultRowSet { size = limit == -1 ? buckets.size() : Math.min(buckets.size(), limit); if (next == null) { - cursor = Cursor.EMPTY; + remainingData = 0; } else { // Compute remaining limit @@ -56,9 +52,9 @@ class CompositeAggsRowSet extends ResultRowSet { // however the Querier takes care of that and keeps making requests until either the query is invalid or at least one response // is returned. if (size == 0 || remainingLimit == 0) { - cursor = Cursor.EMPTY; + remainingData = 0; } else { - cursor = new CompositeAggregationCursor(next, exts, mask, remainingLimit, includeFrozen, indices); + remainingData = remainingLimit; } } } @@ -92,8 +88,7 @@ public int size() { return size; } - @Override - public Cursor nextPageCursor() { - return cursor; + int remainingData() { + return remainingData; } -} +} \ No newline at end of file diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/PagingListRowSet.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/PagingListRowSet.java deleted file mode 100644 index 73da15255f0ff..0000000000000 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/PagingListRowSet.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -package org.elasticsearch.xpack.sql.execution.search; - -import org.elasticsearch.xpack.sql.session.Cursor; -import org.elasticsearch.xpack.sql.session.ListRowSet; -import org.elasticsearch.xpack.sql.type.Schema; - -import java.util.List; - -class PagingListRowSet extends ListRowSet { - - private final int pageSize; - private final int columnCount; - private final Cursor cursor; - - PagingListRowSet(List> list, int columnCount, int pageSize) { - this(Schema.EMPTY, list, columnCount, pageSize); - } - - PagingListRowSet(Schema schema, List> list, int columnCount, int pageSize) { - super(schema, list); - this.columnCount = columnCount; - this.pageSize = Math.min(pageSize, list.size()); - this.cursor = list.size() > pageSize ? new PagingListCursor(list, columnCount, pageSize) : Cursor.EMPTY; - } - - @Override - public int size() { - return pageSize; - } - - @Override - public int columnCount() { - return columnCount; - } - - @Override - public Cursor nextPageCursor() { - return cursor; - } -} diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java index 7949505d162f6..10b4d8663ef60 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java @@ -20,7 +20,6 @@ import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.MultiBucketConsumerService; @@ -57,6 +56,8 @@ import org.elasticsearch.xpack.sql.querydsl.container.TopHitsAggRef; import org.elasticsearch.xpack.sql.session.Configuration; import org.elasticsearch.xpack.sql.session.Cursor; +import org.elasticsearch.xpack.sql.session.Cursor.Page; +import org.elasticsearch.xpack.sql.session.ListCursor; import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.Rows; import org.elasticsearch.xpack.sql.session.SchemaRowSet; @@ -75,6 +76,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static java.util.Collections.singletonList; +import static org.elasticsearch.action.ActionListener.wrap; // TODO: add retry/back-off public class Querier { @@ -98,7 +100,7 @@ public Querier(SqlSession sqlSession) { this.size = cfg.pageSize(); } - public void query(List output, QueryContainer query, String index, ActionListener listener) { + public void query(List output, QueryContainer query, String index, ActionListener listener) { // prepare the request SearchSourceBuilder sourceBuilder = SourceGenerator.sourceBuilder(query, filter, size); // set query timeout @@ -152,22 +154,19 @@ public static SearchRequest prepareRequest(Client client, SearchSourceBuilder so * results back to the client. */ @SuppressWarnings("rawtypes") - class LocalAggregationSorterListener implements ActionListener { + class LocalAggregationSorterListener implements ActionListener { - private final ActionListener listener; + private final ActionListener listener; // keep the top N entries. private final AggSortingQueue data; private final AtomicInteger counter = new AtomicInteger(); private volatile Schema schema; - /** - * Match the default value for {@link MultiBucketConsumerService#MAX_BUCKET_SETTING} - */ - private static final int MAXIMUM_SIZE = 10_000; + private static final int MAXIMUM_SIZE = MultiBucketConsumerService.DEFAULT_MAX_BUCKETS; private final boolean noLimit; - LocalAggregationSorterListener(ActionListener listener, List> sortingColumns, int limit) { + LocalAggregationSorterListener(ActionListener listener, List> sortingColumns, int limit) { this.listener = listener; int size = MAXIMUM_SIZE; @@ -186,20 +185,26 @@ class LocalAggregationSorterListener implements ActionListener { } @Override - public void onResponse(SchemaRowSet schemaRowSet) { - schema = schemaRowSet.schema(); - doResponse(schemaRowSet); - } + public void onResponse(Page page) { + // schema is set on the first page (as the rest don't hold the schema anymore) + if (schema == null) { + RowSet rowSet = page.rowSet(); + if (rowSet instanceof SchemaRowSet) { + schema = ((SchemaRowSet) rowSet).schema(); + } else { + onFailure(new SqlIllegalArgumentException("No schema found inside {}", rowSet.getClass())); + return; + } + } - private void doResponse(RowSet rowSet) { // 1. consume all pages received - consumeRowSet(rowSet); + consumeRowSet(page.rowSet()); - Cursor cursor = rowSet.nextPageCursor(); + Cursor cursor = page.next(); // 1a. trigger a next call if there's still data if (cursor != Cursor.EMPTY) { // trigger a next call - planExecutor.nextPage(cfg, cursor, ActionListener.wrap(this::doResponse, this::onFailure)); + planExecutor.nextPage(cfg, cursor, this); // make sure to bail out afterwards as we'll get called by a different thread return; } @@ -223,7 +228,7 @@ private void consumeRowSet(RowSet rowSet) { } private void sendResponse() { - listener.onResponse(new PagingListRowSet(schema, data.asList(), schema.size(), cfg.pageSize())); + listener.onResponse(ListCursor.of(schema, data.asList(), cfg.pageSize())); } @Override @@ -265,13 +270,13 @@ public Aggregations getAggregations() { } }); - ImplicitGroupActionListener(ActionListener listener, Client client, Configuration cfg, List output, + ImplicitGroupActionListener(ActionListener listener, Client client, Configuration cfg, List output, QueryContainer query, SearchRequest request) { super(listener, client, cfg, output, query, request); } @Override - protected void handleResponse(SearchResponse response, ActionListener listener) { + protected void handleResponse(SearchResponse response, ActionListener listener) { Aggregations aggs = response.getAggregations(); if (aggs != null) { Aggregation agg = aggs.get(Aggs.ROOT_GROUP_NAME); @@ -298,10 +303,10 @@ private void handleBuckets(List buckets, SearchResponse respon for (int i = mask.nextSetBit(0); i >= 0; i = mask.nextSetBit(i + 1)) { values[index++] = extractors.get(i).extract(implicitGroup); } - listener.onResponse(Rows.singleton(schema, values)); + listener.onResponse(Page.last(Rows.singleton(schema, values))); } else if (buckets.isEmpty()) { - listener.onResponse(Rows.empty(schema)); + listener.onResponse(Page.last(Rows.empty(schema))); } else { throw new SqlIllegalArgumentException("Too many groups returned by the implicit group; expected 1, received {}", @@ -316,43 +321,21 @@ private void handleBuckets(List buckets, SearchResponse respon */ static class CompositeActionListener extends BaseAggActionListener { - CompositeActionListener(ActionListener listener, Client client, Configuration cfg, + CompositeActionListener(ActionListener listener, Client client, Configuration cfg, List output, QueryContainer query, SearchRequest request) { super(listener, client, cfg, output, query, request); } @Override - protected void handleResponse(SearchResponse response, ActionListener listener) { - // there are some results - if (response.getAggregations().asList().isEmpty() == false) { - - // retry - if (CompositeAggregationCursor.shouldRetryDueToEmptyPage(response)) { - CompositeAggregationCursor.updateCompositeAfterKey(response, request.source()); - client.search(request, this); - return; - } - - CompositeAggregationCursor.updateCompositeAfterKey(response, request.source()); - byte[] nextSearch; - try { - nextSearch = CompositeAggregationCursor.serializeQuery(request.source()); - } catch (Exception ex) { - listener.onFailure(ex); - return; - } - - listener.onResponse( - new SchemaCompositeAggsRowSet(schema, initBucketExtractors(response), mask, response, - query.sortingColumns().isEmpty() ? query.limit() : -1, - nextSearch, - query.shouldIncludeFrozen(), - request.indices())); - } - // no results - else { - listener.onResponse(Rows.empty(schema)); - } + protected void handleResponse(SearchResponse response, ActionListener listener) { + + CompositeAggregationCursor.handle(response, request.source(), + ba -> new SchemaCompositeAggsRowSet(schema, initBucketExtractors(response), mask, response, + query.sortingColumns().isEmpty() ? query.limit() : -1, ba), + () -> client.search(request, this), + p -> listener.onResponse(p), + e -> listener.onFailure(e), + schema, query.shouldIncludeFrozen(), request.indices()); } } @@ -361,7 +344,7 @@ abstract static class BaseAggActionListener extends BaseActionListener { final SearchRequest request; final BitSet mask; - BaseAggActionListener(ActionListener listener, Client client, Configuration cfg, List output, + BaseAggActionListener(ActionListener listener, Client client, Configuration cfg, List output, QueryContainer query, SearchRequest request) { super(listener, client, cfg, output); @@ -426,7 +409,7 @@ static class ScrollActionListener extends BaseActionListener { private final BitSet mask; private final boolean multiValueFieldLeniency; - ScrollActionListener(ActionListener listener, Client client, Configuration cfg, + ScrollActionListener(ActionListener listener, Client client, Configuration cfg, List output, QueryContainer query) { super(listener, client, cfg, output); this.query = query; @@ -435,9 +418,7 @@ static class ScrollActionListener extends BaseActionListener { } @Override - protected void handleResponse(SearchResponse response, ActionListener listener) { - SearchHit[] hits = response.getHits().getHits(); - + protected void handleResponse(SearchResponse response, ActionListener listener) { // create response extractors for the first time List> refs = query.fields(); @@ -446,30 +427,10 @@ protected void handleResponse(SearchResponse response, ActionListener 0) { - String scrollId = response.getScrollId(); - SchemaSearchHitRowSet hitRowSet = new SchemaSearchHitRowSet(schema, exts, mask, hits, query.limit(), scrollId); - - // if there's an id, try to setup next scroll - if (scrollId != null && - // is all the content already retrieved? - (Boolean.TRUE.equals(response.isTerminatedEarly()) - || response.getHits().getTotalHits().value == hits.length - || hitRowSet.isLimitReached())) { - // if so, clear the scroll - clear(response.getScrollId(), ActionListener.wrap( - succeeded -> listener.onResponse(new SchemaSearchHitRowSet(schema, exts, mask, hits, query.limit(), null)), - listener::onFailure)); - } else { - listener.onResponse(hitRowSet); - } - } - // no hits - else { - clear(response.getScrollId(), ActionListener.wrap(succeeded -> listener.onResponse(Rows.empty(schema)), - listener::onFailure)); - } + ScrollCursor.handle(response, () -> new SchemaSearchHitRowSet(schema, exts, mask, query.limit(), response), + p -> listener.onResponse(p), + p -> clear(response.getScrollId(), wrap(success -> listener.onResponse(p), listener::onFailure)), + schema); } private HitExtractor createExtractor(FieldExtraction ref) { @@ -515,14 +476,14 @@ private HitExtractor createExtractor(FieldExtraction ref) { */ abstract static class BaseActionListener implements ActionListener { - final ActionListener listener; + final ActionListener listener; final Client client; final Configuration cfg; final TimeValue keepAlive; final Schema schema; - BaseActionListener(ActionListener listener, Client client, Configuration cfg, List output) { + BaseActionListener(ActionListener listener, Client client, Configuration cfg, List output) { this.listener = listener; this.client = client; @@ -546,7 +507,7 @@ public void onResponse(final SearchResponse response) { } } - protected abstract void handleResponse(SearchResponse response, ActionListener listener); + protected abstract void handleResponse(SearchResponse response, ActionListener listener); // clean-up the scroll in case of exception protected final void cleanup(SearchResponse response, Exception ex) { diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SchemaCompositeAggsRowSet.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SchemaCompositeAggsRowSet.java index 3ec4ff6b11427..7eeb8b28f1542 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SchemaCompositeAggsRowSet.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SchemaCompositeAggsRowSet.java @@ -22,11 +22,8 @@ class SchemaCompositeAggsRowSet extends CompositeAggsRowSet implements SchemaRow private final Schema schema; - SchemaCompositeAggsRowSet(Schema schema, List exts, BitSet mask, SearchResponse response, int limitAggs, - byte[] next, - boolean includeFrozen, - String... indices) { - super(exts, mask, response, limitAggs, next, includeFrozen, indices); + SchemaCompositeAggsRowSet(Schema schema, List exts, BitSet mask, SearchResponse r, int limitAggs, byte[] next) { + super(exts, mask, r, limitAggs, next); this.schema = schema; } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SchemaSearchHitRowSet.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SchemaSearchHitRowSet.java index aa5c57aab609d..7ba7a06fd8a37 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SchemaSearchHitRowSet.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SchemaSearchHitRowSet.java @@ -5,7 +5,7 @@ */ package org.elasticsearch.xpack.sql.execution.search; -import org.elasticsearch.search.SearchHit; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractor; import org.elasticsearch.xpack.sql.session.SchemaRowSet; import org.elasticsearch.xpack.sql.type.Schema; @@ -21,8 +21,8 @@ class SchemaSearchHitRowSet extends SearchHitRowSet implements SchemaRowSet { private final Schema schema; - SchemaSearchHitRowSet(Schema schema, List exts, BitSet mask, SearchHit[] hits, int limitHits, String scrollId) { - super(exts, mask, hits, limitHits, scrollId); + SchemaSearchHitRowSet(Schema schema, List exts, BitSet mask, int limitHits, SearchResponse response) { + super(exts, mask, limitHits, response); this.schema = schema; } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursor.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursor.java index af57126cc5610..55f78db40739d 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursor.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursor.java @@ -14,18 +14,25 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.client.Client; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.search.SearchHit; import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractor; import org.elasticsearch.xpack.sql.session.Configuration; import org.elasticsearch.xpack.sql.session.Cursor; -import org.elasticsearch.xpack.sql.session.RowSet; +import org.elasticsearch.xpack.sql.session.Rows; +import org.elasticsearch.xpack.sql.type.Schema; import java.io.IOException; import java.util.BitSet; import java.util.List; import java.util.Objects; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static org.elasticsearch.action.ActionListener.wrap; public class ScrollCursor implements Cursor { @@ -83,29 +90,48 @@ int limit() { return limit; } @Override - public void nextPage(Configuration cfg, Client client, NamedWriteableRegistry registry, ActionListener listener) { - log.trace("About to execute scroll query {}", scrollId); + public void nextPage(Configuration cfg, Client client, NamedWriteableRegistry registry, ActionListener listener) { + if (log.isTraceEnabled()) { + log.trace("About to execute scroll query {}", scrollId); + } SearchScrollRequest request = new SearchScrollRequest(scrollId).scroll(cfg.pageTimeout()); - client.searchScroll(request, ActionListener.wrap((SearchResponse response) -> { - SearchHitRowSet rowSet = new SearchHitRowSet(extractors, mask, response.getHits().getHits(), - limit, response.getScrollId()); - if (rowSet.nextPageCursor() == Cursor.EMPTY ) { - // we are finished with this cursor, let's clean it before continuing - clear(cfg, client, ActionListener.wrap(success -> listener.onResponse(rowSet), listener::onFailure)); - } else { - listener.onResponse(rowSet); - } + client.searchScroll(request, wrap(response -> { + handle(response, () -> new SearchHitRowSet(extractors, mask, limit, response), + p -> listener.onResponse(p), + p -> clear(cfg, client, wrap(success -> listener.onResponse(p), listener::onFailure)), + Schema.EMPTY); }, listener::onFailure)); } @Override public void clear(Configuration cfg, Client client, ActionListener listener) { - cleanCursor(client, scrollId, - ActionListener.wrap( + cleanCursor(client, scrollId, wrap( clearScrollResponse -> listener.onResponse(clearScrollResponse.isSucceeded()), listener::onFailure)); } + + static void handle(SearchResponse response, Supplier makeRowHit, Consumer onPage, Consumer clearScroll, + Schema schema) { + SearchHit[] hits = response.getHits().getHits(); + // clean-up + if (hits.length > 0) { + SearchHitRowSet rowSet = makeRowHit.get(); + Tuple nextScrollData = rowSet.nextScrollData(); + + if (nextScrollData == null) { + // no more data, let's clean the scroll before continuing + clearScroll.accept(Page.last(rowSet)); + } else { + Cursor next = new ScrollCursor(nextScrollData.v1(), rowSet.extractors(), rowSet.mask(), nextScrollData.v2()); + onPage.accept(new Page(rowSet, next)); + } + } + // no-hits + else { + clearScroll.accept(Page.last(Rows.empty(schema))); + } + } @Override public boolean equals(Object obj) { diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SearchHitRowSet.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SearchHitRowSet.java index 97b369a7a12ee..5a11bfde64561 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SearchHitRowSet.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SearchHitRowSet.java @@ -5,11 +5,13 @@ */ package org.elasticsearch.xpack.sql.execution.search; +import org.apache.lucene.search.TotalHits; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.xpack.sql.SqlIllegalArgumentException; import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractor; -import org.elasticsearch.xpack.sql.session.Cursor; import java.util.ArrayList; import java.util.Arrays; @@ -29,18 +31,19 @@ class SearchHitRowSet extends ResultRowSet { private final SearchHit[] hits; private final Map> flatInnerHits = new HashMap<>(); - private final Cursor cursor; private final Set innerHits = new LinkedHashSet<>(); private final String innerHit; private final int size; private final int[] indexPerLevel; + private final Tuple nextScrollData; + private int row = 0; - SearchHitRowSet(List exts, BitSet mask, SearchHit[] hits, int limit, String scrollId) { + SearchHitRowSet(List exts, BitSet mask, int limit, SearchResponse response) { super(exts, mask); - this.hits = hits; + this.hits = response.getHits().getHits(); // Since the results might contain nested docs, the iteration is similar to that of Aggregation // namely it discovers the nested docs and then, for iteration, increments the deepest level first @@ -81,24 +84,30 @@ class SearchHitRowSet extends ResultRowSet { indexPerLevel = new int[maxDepth + 1]; this.innerHit = innerHit; + String scrollId = response.getScrollId(); + if (scrollId == null) { /* SearchResponse can contain a null scroll when you start a * scroll but all results fit in the first page. */ - cursor = Cursor.EMPTY; + nextScrollData = null; } else { + TotalHits totalHits = response.getHits().getTotalHits(); + // compute remaining limit (only if the limit is specified - that is, positive). int remainingLimit = limit < 0 ? limit : limit - size; // if the computed limit is zero, or the size is zero it means either there's nothing left or the limit has been reached - if (size == 0 || remainingLimit == 0) { - cursor = Cursor.EMPTY; + if (size == 0 || remainingLimit == 0 + // or the scroll has ended + || totalHits != null && totalHits.value == hits.length) { + nextScrollData = null; } else { - cursor = new ScrollCursor(scrollId, extractors(), mask, remainingLimit); + nextScrollData = new Tuple<>(scrollId, remainingLimit); } } } protected boolean isLimitReached() { - return cursor == Cursor.EMPTY; + return nextScrollData == null; } @Override @@ -131,8 +140,8 @@ private SearchHit[] getAllInnerHits(SearchHit hit, String path) { int endOfPath = entry.getKey().lastIndexOf('_'); if (endOfPath >= 0 && entry.getKey().substring(0, endOfPath).equals(path)) { SearchHit[] h = entry.getValue().getHits(); - for (int i = 0; i < h.length; i++) { - lhm.put(h[i].getNestedIdentity().getOffset(), h[i]); + for (SearchHit element : h) { + lhm.put(element.getNestedIdentity().getOffset(), element); } } } @@ -146,7 +155,7 @@ private SearchHit[] getAllInnerHits(SearchHit hit, String path) { } private class NestedHitOffsetComparator implements Comparator { - @Override + @Override public int compare(SearchHit sh1, SearchHit sh2) { if (sh1 == null && sh2 == null) { return 0; @@ -210,8 +219,7 @@ public int size() { return size; } - @Override - public Cursor nextPageCursor() { - return cursor; + Tuple nextScrollData() { + return nextScrollData; } } \ No newline at end of file diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/LocalRelation.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/LocalRelation.java index 53a485a3b0542..9abe6fef3d4bb 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/LocalRelation.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/LocalRelation.java @@ -7,11 +7,11 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.xpack.sql.expression.Attribute; +import org.elasticsearch.xpack.sql.session.Cursor.Page; import org.elasticsearch.xpack.sql.session.Executable; -import org.elasticsearch.xpack.sql.session.SchemaRowSet; import org.elasticsearch.xpack.sql.session.SqlSession; -import org.elasticsearch.xpack.sql.tree.Source; import org.elasticsearch.xpack.sql.tree.NodeInfo; +import org.elasticsearch.xpack.sql.tree.Source; import java.util.List; import java.util.Objects; @@ -52,7 +52,7 @@ public List output() { } @Override - public void execute(SqlSession session, ActionListener listener) { + public void execute(SqlSession session, ActionListener listener) { executable.execute(session, listener); } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Command.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Command.java index 72ae456a33b77..0b9766345869d 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Command.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Command.java @@ -7,7 +7,11 @@ import org.elasticsearch.xpack.sql.expression.FieldAttribute; import org.elasticsearch.xpack.sql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.sql.session.Cursor.Page; import org.elasticsearch.xpack.sql.session.Executable; +import org.elasticsearch.xpack.sql.session.ListCursor; +import org.elasticsearch.xpack.sql.session.Rows; +import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.tree.Source; import org.elasticsearch.xpack.sql.type.DataType; import org.elasticsearch.xpack.sql.type.EsField; @@ -51,4 +55,8 @@ protected final FieldAttribute field(String name, DataType type) { private FieldAttribute field(String name, EsField field) { return new FieldAttribute(source(), name, field); } + + protected Page of(SqlSession session, List> values) { + return ListCursor.of(Rows.schema(output()), values, session.configuration().pageSize()); + } } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Debug.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Debug.java index f4aa378bbce75..eda730e8adbb0 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Debug.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Debug.java @@ -12,13 +12,13 @@ import org.elasticsearch.xpack.sql.rule.RuleExecutor.Batch; import org.elasticsearch.xpack.sql.rule.RuleExecutor.ExecutionInfo; import org.elasticsearch.xpack.sql.rule.RuleExecutor.Transformation; +import org.elasticsearch.xpack.sql.session.Cursor.Page; import org.elasticsearch.xpack.sql.session.Rows; -import org.elasticsearch.xpack.sql.session.SchemaRowSet; import org.elasticsearch.xpack.sql.session.SqlSession; -import org.elasticsearch.xpack.sql.tree.Source; import org.elasticsearch.xpack.sql.tree.Node; import org.elasticsearch.xpack.sql.tree.NodeInfo; import org.elasticsearch.xpack.sql.tree.NodeUtils; +import org.elasticsearch.xpack.sql.tree.Source; import org.elasticsearch.xpack.sql.type.KeywordEsField; import org.elasticsearch.xpack.sql.util.Graphviz; @@ -75,7 +75,7 @@ public List output() { } @Override - public void execute(SqlSession session, ActionListener listener) { + public void execute(SqlSession session, ActionListener listener) { switch (type) { case ANALYZED: session.debugAnalyzedPlan(plan, wrap(i -> handleInfo(i, listener), listener::onFailure)); @@ -90,7 +90,7 @@ public void execute(SqlSession session, ActionListener listener) { } @SuppressWarnings({ "rawtypes", "unchecked" }) - private void handleInfo(ExecutionInfo info, ActionListener listener) { + private void handleInfo(ExecutionInfo info, ActionListener listener) { String planString = null; if (format == Format.TEXT) { @@ -135,7 +135,7 @@ private void handleInfo(ExecutionInfo info, ActionListener listene } } - listener.onResponse(Rows.singleton(output(), planString)); + listener.onResponse(Page.last(Rows.singleton(output(), planString))); } @Override diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Explain.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Explain.java index c6904a87f3f4a..d3eac1cd6bbb3 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Explain.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Explain.java @@ -13,11 +13,11 @@ import org.elasticsearch.xpack.sql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.sql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.sql.planner.Planner; +import org.elasticsearch.xpack.sql.session.Cursor.Page; import org.elasticsearch.xpack.sql.session.Rows; -import org.elasticsearch.xpack.sql.session.SchemaRowSet; import org.elasticsearch.xpack.sql.session.SqlSession; -import org.elasticsearch.xpack.sql.tree.Source; import org.elasticsearch.xpack.sql.tree.NodeInfo; +import org.elasticsearch.xpack.sql.tree.Source; import org.elasticsearch.xpack.sql.type.KeywordEsField; import org.elasticsearch.xpack.sql.util.Graphviz; @@ -85,10 +85,10 @@ public List output() { } @Override - public void execute(SqlSession session, ActionListener listener) { + public void execute(SqlSession session, ActionListener listener) { if (type == Type.PARSED) { - listener.onResponse(Rows.singleton(output(), formatPlan(format, plan))); + listener.onResponse(Page.last(Rows.singleton(output(), formatPlan(format, plan)))); return; } @@ -96,7 +96,7 @@ public void execute(SqlSession session, ActionListener listener) { session.analyzedPlan(plan, verify, wrap(analyzedPlan -> { if (type == Type.ANALYZED) { - listener.onResponse(Rows.singleton(output(), formatPlan(format, analyzedPlan))); + listener.onResponse(Page.last(Rows.singleton(output(), formatPlan(format, analyzedPlan)))); return; } @@ -105,25 +105,25 @@ public void execute(SqlSession session, ActionListener listener) { if (verify) { session.optimizedPlan(analyzedPlan, wrap(optimizedPlan -> { if (type == Type.OPTIMIZED) { - listener.onResponse(Rows.singleton(output(), formatPlan(format, optimizedPlan))); + listener.onResponse(Page.last(Rows.singleton(output(), formatPlan(format, optimizedPlan)))); return; } PhysicalPlan mappedPlan = planner.mapPlan(optimizedPlan, verify); if (type == Type.MAPPED) { - listener.onResponse(Rows.singleton(output(), formatPlan(format, mappedPlan))); + listener.onResponse(Page.last(Rows.singleton(output(), formatPlan(format, mappedPlan)))); return; } PhysicalPlan executablePlan = planner.foldPlan(mappedPlan, verify); if (type == Type.EXECUTABLE) { - listener.onResponse(Rows.singleton(output(), formatPlan(format, executablePlan))); + listener.onResponse(Page.last(Rows.singleton(output(), formatPlan(format, executablePlan)))); return; } // Type.All - listener.onResponse(Rows.singleton(output(), printPlans(format, plan, analyzedPlan, optimizedPlan, - mappedPlan, executablePlan))); + listener.onResponse(Page.last( + Rows.singleton(output(), printPlans(format, plan, analyzedPlan, optimizedPlan, mappedPlan, executablePlan)))); }, listener::onFailure)); } @@ -133,14 +133,14 @@ public void execute(SqlSession session, ActionListener listener) { if (session.verifier().verifyFailures(analyzedPlan).isEmpty()) { session.optimizedPlan(analyzedPlan, wrap(optimizedPlan -> { if (type == Type.OPTIMIZED) { - listener.onResponse(Rows.singleton(output(), formatPlan(format, optimizedPlan))); + listener.onResponse(Page.last(Rows.singleton(output(), formatPlan(format, optimizedPlan)))); return; } PhysicalPlan mappedPlan = planner.mapPlan(optimizedPlan, verify); if (type == Type.MAPPED) { - listener.onResponse(Rows.singleton(output(), formatPlan(format, mappedPlan))); + listener.onResponse(Page.last(Rows.singleton(output(), formatPlan(format, mappedPlan)))); return; } @@ -148,30 +148,30 @@ public void execute(SqlSession session, ActionListener listener) { PhysicalPlan executablePlan = planner.foldPlan(mappedPlan, verify); if (type == Type.EXECUTABLE) { - listener.onResponse(Rows.singleton(output(), formatPlan(format, executablePlan))); + listener.onResponse(Page.last(Rows.singleton(output(), formatPlan(format, executablePlan)))); return; } - listener.onResponse(Rows.singleton(output(), printPlans(format, plan, analyzedPlan, optimizedPlan, - mappedPlan, executablePlan))); + listener.onResponse(Page.last(Rows.singleton(output(), + printPlans(format, plan, analyzedPlan, optimizedPlan, mappedPlan, executablePlan)))); return; } // mapped failed if (type != Type.ALL) { - listener.onResponse(Rows.singleton(output(), formatPlan(format, mappedPlan))); + listener.onResponse(Page.last(Rows.singleton(output(), formatPlan(format, mappedPlan)))); return; } - listener.onResponse(Rows.singleton(output(), printPlans(format, plan, analyzedPlan, optimizedPlan, - mappedPlan, null))); + listener.onResponse(Page + .last(Rows.singleton(output(), printPlans(format, plan, analyzedPlan, optimizedPlan, mappedPlan, null)))); }, listener::onFailure)); // cannot continue } else { if (type != Type.ALL) { - listener.onResponse(Rows.singleton(output(), formatPlan(format, analyzedPlan))); + listener.onResponse(Page.last(Rows.singleton(output(), formatPlan(format, analyzedPlan)))); } else { - listener.onResponse(Rows.singleton(output(), printPlans(format, plan, analyzedPlan, null, null, null))); + listener.onResponse(Page.last(Rows.singleton(output(), printPlans(format, plan, analyzedPlan, null, null, null)))); } } } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowColumns.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowColumns.java index 7cddc3fc0a7e9..33643fa0f9ff0 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowColumns.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowColumns.java @@ -9,8 +9,7 @@ import org.elasticsearch.xpack.sql.expression.Attribute; import org.elasticsearch.xpack.sql.expression.FieldAttribute; import org.elasticsearch.xpack.sql.expression.predicate.regex.LikePattern; -import org.elasticsearch.xpack.sql.session.Rows; -import org.elasticsearch.xpack.sql.session.SchemaRowSet; +import org.elasticsearch.xpack.sql.session.Cursor.Page; import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.tree.NodeInfo; import org.elasticsearch.xpack.sql.tree.Source; @@ -61,7 +60,7 @@ public List output() { } @Override - public void execute(SqlSession session, ActionListener listener) { + public void execute(SqlSession session, ActionListener listener) { String idx = index != null ? index : (pattern != null ? pattern.asIndexNameWildcard() : "*"); String regex = pattern != null ? pattern.asJavaRegex() : null; @@ -73,7 +72,7 @@ public void execute(SqlSession session, ActionListener listener) { rows = new ArrayList<>(); fillInRows(indexResult.get().mapping(), null, rows); } - listener.onResponse(Rows.of(output(), rows)); + listener.onResponse(of(session, rows)); }, listener::onFailure)); } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowFunctions.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowFunctions.java index f0e5d3fced0ae..e30c252fe3247 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowFunctions.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowFunctions.java @@ -11,11 +11,10 @@ import org.elasticsearch.xpack.sql.expression.function.FunctionDefinition; import org.elasticsearch.xpack.sql.expression.function.FunctionRegistry; import org.elasticsearch.xpack.sql.expression.predicate.regex.LikePattern; -import org.elasticsearch.xpack.sql.session.Rows; -import org.elasticsearch.xpack.sql.session.SchemaRowSet; +import org.elasticsearch.xpack.sql.session.Cursor.Page; import org.elasticsearch.xpack.sql.session.SqlSession; -import org.elasticsearch.xpack.sql.tree.Source; import org.elasticsearch.xpack.sql.tree.NodeInfo; +import org.elasticsearch.xpack.sql.tree.Source; import org.elasticsearch.xpack.sql.type.KeywordEsField; import java.util.Collection; @@ -50,11 +49,11 @@ public List output() { } @Override - public void execute(SqlSession session, ActionListener listener) { + public void execute(SqlSession session, ActionListener listener) { FunctionRegistry registry = session.functionRegistry(); Collection functions = registry.listFunctions(pattern != null ? pattern.asJavaRegex() : null); - listener.onResponse(Rows.of(output(), functions.stream() + listener.onResponse(of(session, functions.stream() .map(f -> asList(f.name(), f.type().name())) .collect(toList()))); } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowSchemas.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowSchemas.java index 9d684b3ca456c..6ebcfb2b16b20 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowSchemas.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowSchemas.java @@ -8,11 +8,11 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.xpack.sql.expression.Attribute; import org.elasticsearch.xpack.sql.expression.FieldAttribute; +import org.elasticsearch.xpack.sql.session.Cursor.Page; import org.elasticsearch.xpack.sql.session.Rows; -import org.elasticsearch.xpack.sql.session.SchemaRowSet; import org.elasticsearch.xpack.sql.session.SqlSession; -import org.elasticsearch.xpack.sql.tree.Source; import org.elasticsearch.xpack.sql.tree.NodeInfo; +import org.elasticsearch.xpack.sql.tree.Source; import org.elasticsearch.xpack.sql.type.KeywordEsField; import java.util.List; @@ -36,8 +36,8 @@ public List output() { } @Override - public void execute(SqlSession session, ActionListener listener) { - listener.onResponse(Rows.empty(output())); + public void execute(SqlSession session, ActionListener listener) { + listener.onResponse(Page.last(Rows.empty(output()))); } @Override diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowTables.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowTables.java index 8efc7d84377f8..4cdeae3ef5010 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowTables.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowTables.java @@ -9,8 +9,7 @@ import org.elasticsearch.xpack.sql.analysis.index.IndexResolver.IndexType; import org.elasticsearch.xpack.sql.expression.Attribute; import org.elasticsearch.xpack.sql.expression.predicate.regex.LikePattern; -import org.elasticsearch.xpack.sql.session.Rows; -import org.elasticsearch.xpack.sql.session.SchemaRowSet; +import org.elasticsearch.xpack.sql.session.Cursor.Page; import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.tree.NodeInfo; import org.elasticsearch.xpack.sql.tree.Source; @@ -54,7 +53,7 @@ public List output() { } @Override - public final void execute(SqlSession session, ActionListener listener) { + public final void execute(SqlSession session, ActionListener listener) { String idx = index != null ? index : (pattern != null ? pattern.asIndexNameWildcard() : "*"); String regex = pattern != null ? pattern.asJavaRegex() : null; @@ -63,7 +62,7 @@ public final void execute(SqlSession session, ActionListener liste IndexType.VALID_INCLUDE_FROZEN : IndexType.VALID_REGULAR; session.indexResolver().resolveNames(idx, regex, withFrozen, ActionListener.wrap(result -> { - listener.onResponse(Rows.of(output(), result.stream() + listener.onResponse(of(session, result.stream() .map(t -> asList(t.name(), t.type().toSql(), t.type().toNative())) .collect(toList()))); }, listener::onFailure)); diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysColumns.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysColumns.java index 16d6eae924e36..a1bb62b00215f 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysColumns.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysColumns.java @@ -13,8 +13,8 @@ import org.elasticsearch.xpack.sql.expression.predicate.regex.LikePattern; import org.elasticsearch.xpack.sql.plan.logical.command.Command; import org.elasticsearch.xpack.sql.proto.Mode; +import org.elasticsearch.xpack.sql.session.Cursor.Page; import org.elasticsearch.xpack.sql.session.Rows; -import org.elasticsearch.xpack.sql.session.SchemaRowSet; import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.tree.NodeInfo; import org.elasticsearch.xpack.sql.tree.Source; @@ -97,14 +97,14 @@ private List output(boolean odbcCompatible) { } @Override - public void execute(SqlSession session, ActionListener listener) { + public void execute(SqlSession session, ActionListener listener) { Mode mode = session.configuration().mode(); List output = output(mode == Mode.ODBC); String cluster = session.indexResolver().clusterName(); // bail-out early if the catalog is present but differs if (Strings.hasText(catalog) && cluster.equals(catalog) == false) { - listener.onResponse(Rows.empty(output)); + listener.onResponse(Page.last(Rows.empty(output))); return; } @@ -125,7 +125,7 @@ public void execute(SqlSession session, ActionListener listener) { fillInRows(cluster, esIndex.name(), esIndex.mapping(), null, rows, columnMatcher, mode); } - listener.onResponse(Rows.of(output, rows)); + listener.onResponse(of(session, rows)); }, listener::onFailure)); } // otherwise use a merged mapping @@ -138,7 +138,7 @@ public void execute(SqlSession session, ActionListener listener) { fillInRows(cluster, indexName, esIndex.mapping(), null, rows, columnMatcher, mode); } - listener.onResponse(Rows.of(output, rows)); + listener.onResponse(of(session, rows)); }, listener::onFailure)); } } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysTables.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysTables.java index 111b392adb6b7..a3b8f1817415d 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysTables.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysTables.java @@ -11,8 +11,8 @@ import org.elasticsearch.xpack.sql.expression.Attribute; import org.elasticsearch.xpack.sql.expression.predicate.regex.LikePattern; import org.elasticsearch.xpack.sql.plan.logical.command.Command; +import org.elasticsearch.xpack.sql.session.Cursor.Page; import org.elasticsearch.xpack.sql.session.Rows; -import org.elasticsearch.xpack.sql.session.SchemaRowSet; import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.tree.NodeInfo; import org.elasticsearch.xpack.sql.tree.Source; @@ -70,7 +70,7 @@ public List output() { } @Override - public final void execute(SqlSession session, ActionListener listener) { + public final void execute(SqlSession session, ActionListener listener) { String cluster = session.indexResolver().clusterName(); // first check if where dealing with ODBC enumeration @@ -85,7 +85,7 @@ public final void execute(SqlSession session, ActionListener liste Object[] enumeration = new Object[10]; // send only the cluster, everything else null enumeration[0] = cluster; - listener.onResponse(Rows.singleton(output(), enumeration)); + listener.onResponse(Page.last(Rows.singleton(output(), enumeration))); return; } } @@ -111,7 +111,7 @@ public final void execute(SqlSession session, ActionListener liste } values.sort(Comparator.comparing(l -> l.get(3).toString())); - listener.onResponse(Rows.of(output(), values)); + listener.onResponse(of(session, values)); return; } } @@ -122,7 +122,7 @@ public final void execute(SqlSession session, ActionListener liste // if the catalog doesn't match, don't return any results if (cRegex != null && !Pattern.matches(cRegex, cluster)) { - listener.onResponse(Rows.empty(output())); + listener.onResponse(Page.last(Rows.empty(output()))); return; } @@ -141,7 +141,7 @@ public final void execute(SqlSession session, ActionListener liste } session.indexResolver().resolveNames(idx, regex, tableTypes, ActionListener.wrap(result -> listener.onResponse( - Rows.of(output(), result.stream() + of(session, result.stream() // sort by type (which might be legacy), then by name .sorted(Comparator. comparing(i -> legacyName(i.type())) .thenComparing(Comparator.comparing(i -> i.name()))) diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysTypes.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysTypes.java index 2112128b41b00..95ba2346c3ef4 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysTypes.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysTypes.java @@ -8,8 +8,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.xpack.sql.expression.Attribute; import org.elasticsearch.xpack.sql.plan.logical.command.Command; -import org.elasticsearch.xpack.sql.session.Rows; -import org.elasticsearch.xpack.sql.session.SchemaRowSet; +import org.elasticsearch.xpack.sql.session.Cursor.Page; import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.tree.NodeInfo; import org.elasticsearch.xpack.sql.tree.Source; @@ -72,7 +71,7 @@ public List output() { } @Override - public final void execute(SqlSession session, ActionListener listener) { + public final void execute(SqlSession session, ActionListener listener) { Stream values = Stream.of(DataType.values()); if (type.intValue() != 0) { values = values.filter(t -> type.equals(t.sqlType.getVendorTypeNumber())); @@ -110,7 +109,7 @@ public final void execute(SqlSession session, ActionListener liste )) .collect(toList()); - listener.onResponse(Rows.of(output(), rows)); + listener.onResponse(of(session, rows)); } @Override diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/physical/CommandExec.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/physical/CommandExec.java index f1fccb7e2c49d..43a7bfac4628b 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/physical/CommandExec.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/physical/CommandExec.java @@ -8,14 +8,16 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.xpack.sql.expression.Attribute; import org.elasticsearch.xpack.sql.plan.logical.command.Command; -import org.elasticsearch.xpack.sql.session.SchemaRowSet; +import org.elasticsearch.xpack.sql.session.Cursor.Page; import org.elasticsearch.xpack.sql.session.SqlSession; -import org.elasticsearch.xpack.sql.tree.Source; import org.elasticsearch.xpack.sql.tree.NodeInfo; +import org.elasticsearch.xpack.sql.tree.Source; import java.util.List; import java.util.Objects; +import static org.elasticsearch.action.ActionListener.wrap; + public class CommandExec extends LeafExec { private final Command command; @@ -35,8 +37,8 @@ public Command command() { } @Override - public void execute(SqlSession session, ActionListener listener) { - command.execute(session, listener); + public void execute(SqlSession session, ActionListener listener) { + command.execute(session, wrap(listener::onResponse, listener::onFailure)); } @Override diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/physical/EsQueryExec.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/physical/EsQueryExec.java index 6fa87ca90bbf8..6e132fb687110 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/physical/EsQueryExec.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/physical/EsQueryExec.java @@ -9,7 +9,7 @@ import org.elasticsearch.xpack.sql.execution.search.Querier; import org.elasticsearch.xpack.sql.expression.Attribute; import org.elasticsearch.xpack.sql.querydsl.container.QueryContainer; -import org.elasticsearch.xpack.sql.session.SchemaRowSet; +import org.elasticsearch.xpack.sql.session.Cursor.Page; import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.tree.NodeInfo; import org.elasticsearch.xpack.sql.tree.Source; @@ -53,7 +53,7 @@ public List output() { } @Override - public void execute(SqlSession session, ActionListener listener) { + public void execute(SqlSession session, ActionListener listener) { Querier scroller = new Querier(session); scroller.query(output, queryContainer, index, listener); diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/physical/LocalExec.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/physical/LocalExec.java index cce19411465ef..c0adb1a98659a 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/physical/LocalExec.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/physical/LocalExec.java @@ -7,12 +7,12 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.xpack.sql.expression.Attribute; +import org.elasticsearch.xpack.sql.session.Cursor.Page; import org.elasticsearch.xpack.sql.session.EmptyExecutable; import org.elasticsearch.xpack.sql.session.Executable; -import org.elasticsearch.xpack.sql.session.SchemaRowSet; import org.elasticsearch.xpack.sql.session.SqlSession; -import org.elasticsearch.xpack.sql.tree.Source; import org.elasticsearch.xpack.sql.tree.NodeInfo; +import org.elasticsearch.xpack.sql.tree.Source; import java.util.List; import java.util.Objects; @@ -45,7 +45,7 @@ public boolean isEmpty() { } @Override - public void execute(SqlSession session, ActionListener listener) { + public void execute(SqlSession session, ActionListener listener) { executable.execute(session, listener); } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/physical/Unexecutable.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/physical/Unexecutable.java index e1b08b3e492fc..a32d2f889909c 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/physical/Unexecutable.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/physical/Unexecutable.java @@ -7,15 +7,16 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.xpack.sql.planner.PlanningException; +import org.elasticsearch.xpack.sql.session.Cursor.Page; import org.elasticsearch.xpack.sql.session.Executable; -import org.elasticsearch.xpack.sql.session.SchemaRowSet; import org.elasticsearch.xpack.sql.session.SqlSession; // this is mainly a marker interface to validate a plan before being executed public interface Unexecutable extends Executable { - default void execute(SqlSession session, ActionListener listener) { + @Override + default void execute(SqlSession session, ActionListener listener) { throw new PlanningException("Current plan {} is not executable", this); } } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TextFormatterCursor.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TextFormatterCursor.java index 4ab1d77fe21ff..3c19a497565f8 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TextFormatterCursor.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TextFormatterCursor.java @@ -13,7 +13,6 @@ import org.elasticsearch.xpack.sql.action.BasicFormatter; import org.elasticsearch.xpack.sql.session.Configuration; import org.elasticsearch.xpack.sql.session.Cursor; -import org.elasticsearch.xpack.sql.session.RowSet; import java.io.IOException; import java.util.Objects; @@ -59,7 +58,7 @@ public BasicFormatter getFormatter() { } @Override - public void nextPage(Configuration cfg, Client client, NamedWriteableRegistry registry, ActionListener listener) { + public void nextPage(Configuration cfg, Client client, NamedWriteableRegistry registry, ActionListener listener) { delegate.nextPage(cfg, client, registry, listener); } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlQueryAction.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlQueryAction.java index 3e9c30f49b453..35393e2a22998 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlQueryAction.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlQueryAction.java @@ -18,6 +18,7 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.security.SecurityContext; +import org.elasticsearch.xpack.sql.SqlIllegalArgumentException; import org.elasticsearch.xpack.sql.action.SqlQueryAction; import org.elasticsearch.xpack.sql.action.SqlQueryRequest; import org.elasticsearch.xpack.sql.action.SqlQueryResponse; @@ -25,6 +26,7 @@ import org.elasticsearch.xpack.sql.proto.ColumnInfo; import org.elasticsearch.xpack.sql.proto.Mode; import org.elasticsearch.xpack.sql.session.Configuration; +import org.elasticsearch.xpack.sql.session.Cursor.Page; import org.elasticsearch.xpack.sql.session.Cursors; import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.SchemaRowSet; @@ -71,20 +73,26 @@ public static void operation(PlanExecutor planExecutor, SqlQueryRequest request, // The configuration is always created however when dealing with the next page, only the timeouts are relevant // the rest having default values (since the query is already created) Configuration cfg = new Configuration(request.zoneId(), request.fetchSize(), request.requestTimeout(), request.pageTimeout(), - request.filter(), request.mode(), request.clientId(), username, clusterName, request.fieldMultiValueLeniency(), + request.filter(), request.mode(), request.clientId(), username, clusterName, request.fieldMultiValueLeniency(), request.indexIncludeFrozen()); if (Strings.hasText(request.cursor()) == false) { planExecutor.sql(cfg, request.query(), request.params(), - wrap(rowSet -> listener.onResponse(createResponse(request, rowSet)), listener::onFailure)); + wrap(p -> listener.onResponse(createResponseWithSchema(request, p)), listener::onFailure)); } else { planExecutor.nextPage(cfg, Cursors.decodeFromString(request.cursor()), - wrap(rowSet -> listener.onResponse(createResponse(request.mode(), request.columnar(), rowSet, null)), + wrap(p -> listener.onResponse(createResponse(request.mode(), request.columnar(), null, p)), listener::onFailure)); } } - static SqlQueryResponse createResponse(SqlQueryRequest request, SchemaRowSet rowSet) { + static SqlQueryResponse createResponseWithSchema(SqlQueryRequest request, Page page) { + RowSet rset = page.rowSet(); + if ((rset instanceof SchemaRowSet) == false) { + throw new SqlIllegalArgumentException("No schema found inside {}", rset.getClass()); + } + SchemaRowSet rowSet = (SchemaRowSet) rset; + List columns = new ArrayList<>(rowSet.columnCount()); for (Schema.Entry entry : rowSet.schema()) { if (Mode.isDriver(request.mode())) { @@ -94,22 +102,22 @@ static SqlQueryResponse createResponse(SqlQueryRequest request, SchemaRowSet row } } columns = unmodifiableList(columns); - return createResponse(request.mode(), request.columnar(), rowSet, columns); + return createResponse(request.mode(), request.columnar(), columns, page); } - static SqlQueryResponse createResponse(Mode mode, boolean columnar, RowSet rowSet, List columns) { + static SqlQueryResponse createResponse(Mode mode, boolean columnar, List header, Page page) { List> rows = new ArrayList<>(); - rowSet.forEachRow(rowView -> { + page.rowSet().forEachRow(rowView -> { List row = new ArrayList<>(rowView.columnCount()); rowView.forEachColumn(row::add); rows.add(unmodifiableList(row)); }); return new SqlQueryResponse( - Cursors.encodeToString(Version.CURRENT, rowSet.nextPageCursor()), + Cursors.encodeToString(Version.CURRENT, page.next()), mode, columnar, - columns, + header, rows); } } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Cursor.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Cursor.java index ccb4a7cdc40d2..618b417b0d586 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Cursor.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Cursor.java @@ -14,12 +14,35 @@ * Information required to access the next page of response. */ public interface Cursor extends NamedWriteable { + + class Page { + private final RowSet rowSet; + private final Cursor next; + + public Page(RowSet rowSet, Cursor next) { + this.rowSet = rowSet; + this.next = next; + } + + public RowSet rowSet() { + return rowSet; + } + + public Cursor next() { + return next; + } + + public static Page last(RowSet rowSet) { + return new Page(rowSet, EMPTY); + } + } + Cursor EMPTY = EmptyCursor.INSTANCE; /** * Request the next page of data. */ - void nextPage(Configuration cfg, Client client, NamedWriteableRegistry registry, ActionListener listener); + void nextPage(Configuration cfg, Client client, NamedWriteableRegistry registry, ActionListener listener); /** * Cleans the resources associated with the cursor diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Cursors.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Cursors.java index f268754e70489..f1d18a597365e 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Cursors.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Cursors.java @@ -14,7 +14,6 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.xpack.sql.SqlIllegalArgumentException; import org.elasticsearch.xpack.sql.execution.search.CompositeAggregationCursor; -import org.elasticsearch.xpack.sql.execution.search.PagingListCursor; import org.elasticsearch.xpack.sql.execution.search.ScrollCursor; import org.elasticsearch.xpack.sql.execution.search.extractor.BucketExtractors; import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractors; @@ -49,7 +48,7 @@ public static List getNamedWriteables() { entries.add(new NamedWriteableRegistry.Entry(Cursor.class, ScrollCursor.NAME, ScrollCursor::new)); entries.add(new NamedWriteableRegistry.Entry(Cursor.class, CompositeAggregationCursor.NAME, CompositeAggregationCursor::new)); entries.add(new NamedWriteableRegistry.Entry(Cursor.class, TextFormatterCursor.NAME, TextFormatterCursor::new)); - entries.add(new NamedWriteableRegistry.Entry(Cursor.class, PagingListCursor.NAME, PagingListCursor::new)); + entries.add(new NamedWriteableRegistry.Entry(Cursor.class, ListCursor.NAME, ListCursor::new)); // plus all their dependencies entries.addAll(Processors.getNamedWriteables()); diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/EmptyCursor.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/EmptyCursor.java index fd9c63438a878..9f95b90494006 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/EmptyCursor.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/EmptyCursor.java @@ -9,6 +9,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.xpack.sql.SqlIllegalArgumentException; import java.io.IOException; @@ -31,8 +32,8 @@ public String getWriteableName() { } @Override - public void nextPage(Configuration cfg, Client client, NamedWriteableRegistry registry, ActionListener listener) { - throw new IllegalArgumentException("there is no next page"); + public void nextPage(Configuration cfg, Client client, NamedWriteableRegistry registry, ActionListener listener) { + throw new SqlIllegalArgumentException("there is no next page"); } @Override diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/EmptyExecutable.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/EmptyExecutable.java index 09e0d3ac2a300..93a7e51c39b74 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/EmptyExecutable.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/EmptyExecutable.java @@ -7,6 +7,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.xpack.sql.expression.Attribute; +import org.elasticsearch.xpack.sql.session.Cursor.Page; import java.util.List; import java.util.Objects; @@ -25,8 +26,8 @@ public List output() { } @Override - public void execute(SqlSession session, ActionListener listener) { - listener.onResponse(Rows.empty(output)); + public void execute(SqlSession session, ActionListener listener) { + listener.onResponse(Page.last(Rows.empty(output))); } @Override diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/EmptyRowSet.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/EmptyRowSet.java index 9237850998a58..9b6fa2aa3a06a 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/EmptyRowSet.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/EmptyRowSet.java @@ -39,11 +39,6 @@ public int size() { return 0; } - @Override - public Cursor nextPageCursor() { - return Cursor.EMPTY; - } - @Override public Schema schema() { return schema; diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Executable.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Executable.java index dbc163170291e..d1d78194ebe62 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Executable.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Executable.java @@ -5,14 +5,15 @@ */ package org.elasticsearch.xpack.sql.session; -import java.util.List; - import org.elasticsearch.action.ActionListener; import org.elasticsearch.xpack.sql.expression.Attribute; +import org.elasticsearch.xpack.sql.session.Cursor.Page; + +import java.util.List; public interface Executable { List output(); - void execute(SqlSession session, ActionListener listener); + void execute(SqlSession session, ActionListener listener); } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/PagingListCursor.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/ListCursor.java similarity index 64% rename from x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/PagingListCursor.java rename to x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/ListCursor.java index 5a318eaa31f48..7e20abc31de95 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/PagingListCursor.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/ListCursor.java @@ -4,16 +4,14 @@ * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.sql.execution.search; +package org.elasticsearch.xpack.sql.session; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.xpack.sql.session.Configuration; -import org.elasticsearch.xpack.sql.session.Cursor; -import org.elasticsearch.xpack.sql.session.RowSet; +import org.elasticsearch.xpack.sql.type.Schema; import java.io.IOException; import java.util.List; @@ -21,7 +19,7 @@ import static java.util.Collections.emptyList; -public class PagingListCursor implements Cursor { +public class ListCursor implements Cursor { public static final String NAME = "p"; @@ -29,14 +27,14 @@ public class PagingListCursor implements Cursor { private final int columnCount; private final int pageSize; - PagingListCursor(List> data, int columnCount, int pageSize) { + public ListCursor(List> data, int pageSize, int columnCount) { this.data = data; this.columnCount = columnCount; this.pageSize = pageSize; } @SuppressWarnings("unchecked") - public PagingListCursor(StreamInput in) throws IOException { + public ListCursor(StreamInput in) throws IOException { data = (List>) in.readGenericValue(); columnCount = in.readVInt(); pageSize = in.readVInt(); @@ -66,11 +64,27 @@ int pageSize() { return pageSize; } - @Override - public void nextPage(Configuration cfg, Client client, NamedWriteableRegistry registry, ActionListener listener) { - // the check is really a safety measure since the page initialization handles it already (by returning an empty cursor) + public static Page of(Schema schema, List> data, int pageSize) { + return of(schema, data, pageSize, schema.size()); + } + + // NB: private since the columnCount is for public cases inferred by the columnCount + // only on the next-page the schema becomes null however that's an internal detail hence + // why this method is not exposed + private static Page of(Schema schema, List> data, int pageSize, int columnCount) { List> nextData = data.size() > pageSize ? data.subList(pageSize, data.size()) : emptyList(); - listener.onResponse(new PagingListRowSet(nextData, columnCount, pageSize)); + Cursor next = nextData.isEmpty() + ? Cursor.EMPTY + : new ListCursor(nextData, pageSize, columnCount); + List> currData = data.isEmpty() || pageSize == 0 + ? emptyList() + : data.size() == pageSize ? data : data.subList(0, Math.min(pageSize, data.size())); + return new Page(new ListRowSet(schema, currData, columnCount), next); + } + + @Override + public void nextPage(Configuration cfg, Client client, NamedWriteableRegistry registry, ActionListener listener) { + listener.onResponse(of(Schema.EMPTY, data, pageSize, columnCount)); } @Override @@ -93,7 +107,7 @@ public boolean equals(Object obj) { return false; } - PagingListCursor other = (PagingListCursor) obj; + ListCursor other = (ListCursor) obj; return Objects.equals(pageSize, other.pageSize) && Objects.equals(columnCount, other.columnCount) && Objects.equals(data, other.data); diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/ListRowSet.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/ListRowSet.java index 0122d333f7e9d..6bbbbaa462d05 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/ListRowSet.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/ListRowSet.java @@ -13,13 +13,20 @@ public class ListRowSet extends AbstractRowSet implements SchemaRowSet { private final Schema schema; private final List> list; + private final int columnCount; private int pos = 0; - protected ListRowSet(Schema schema, List> list) { + ListRowSet(Schema schema, List> list) { + this(schema, list, schema.size()); + } + + ListRowSet(Schema schema, List> list, int columnCount) { this.schema = schema; + this.columnCount = columnCount; this.list = list; } + @Override protected boolean doHasCurrent() { return pos < size(); @@ -50,12 +57,12 @@ public int size() { } @Override - public Cursor nextPageCursor() { - return Cursor.EMPTY; + public Schema schema() { + return schema; } @Override - public Schema schema() { - return schema; + public int columnCount() { + return columnCount; } } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/RowSet.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/RowSet.java index 38a22ff73f145..93d6745dc6594 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/RowSet.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/RowSet.java @@ -22,11 +22,6 @@ public interface RowSet extends RowView { void reset(); - /** - * The key used by PlanExecutor#nextPage to fetch the next page. - */ - Cursor nextPageCursor(); - default void forEachRow(Consumer action) { for (boolean hasRows = hasCurrentRow(); hasRows; hasRows = advanceRow()) { action.accept(this); diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/SingletonExecutable.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/SingletonExecutable.java index 8a526fac6dfe4..d86ac5fe00814 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/SingletonExecutable.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/SingletonExecutable.java @@ -7,18 +7,20 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.xpack.sql.expression.Attribute; +import org.elasticsearch.xpack.sql.session.Cursor.Page; import org.elasticsearch.xpack.sql.util.Check; -import java.util.Collections; import java.util.List; +import static java.util.Collections.emptyList; + public class SingletonExecutable implements Executable { private final List output; private final Object[] values; public SingletonExecutable() { - this(Collections.emptyList()); + this(emptyList()); } public SingletonExecutable(List output, Object... values) { @@ -33,8 +35,8 @@ public List output() { } @Override - public void execute(SqlSession session, ActionListener listener) { - listener.onResponse(Rows.singleton(output, values)); + public void execute(SqlSession session, ActionListener listener) { + listener.onResponse(Page.last(Rows.singleton(output, values))); } @Override diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/SingletonRowSet.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/SingletonRowSet.java index c8a4e5eddfb6e..649caafc9be2b 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/SingletonRowSet.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/SingletonRowSet.java @@ -43,11 +43,6 @@ public int size() { return 1; } - @Override - public Cursor nextPageCursor() { - return Cursor.EMPTY; - } - @Override public Schema schema() { return schema; diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/SqlSession.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/SqlSession.java index 6a5b5bd2ae5fd..023b443850c16 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/SqlSession.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/SqlSession.java @@ -26,6 +26,7 @@ import org.elasticsearch.xpack.sql.planner.Planner; import org.elasticsearch.xpack.sql.proto.SqlTypedParamValue; import org.elasticsearch.xpack.sql.rule.RuleExecutor; +import org.elasticsearch.xpack.sql.session.Cursor.Page; import java.util.List; import java.util.function.Function; @@ -159,7 +160,7 @@ public void physicalPlan(LogicalPlan optimized, boolean verify, ActionListener

listener.onResponse(planner.plan(o, verify)), listener::onFailure)); } - public void sql(String sql, List params, ActionListener listener) { + public void sql(String sql, List params, ActionListener listener) { sqlExecutable(sql, params, wrap(e -> e.execute(this, listener), listener::onFailure)); } diff --git a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysColumnsTests.java b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysColumnsTests.java index 9c8c32689b70e..509e8e954f353 100644 --- a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysColumnsTests.java +++ b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysColumnsTests.java @@ -514,7 +514,7 @@ private void executeCommand(String sql, List params, Consume return Void.TYPE; }).when(resolver).resolveAsMergedMapping(any(), any(), anyBoolean(), any()); - tuple.v1().execute(tuple.v2(), wrap(consumer::accept, ex -> fail(ex.getMessage()))); + tuple.v1().execute(tuple.v2(), wrap(p -> consumer.accept((SchemaRowSet) p.rowSet()), ex -> fail(ex.getMessage()))); } private Tuple sql(String sql, List params, Map mapping) { diff --git a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysTablesTests.java b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysTablesTests.java index d4db97aba09cf..834c5808b707f 100644 --- a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysTablesTests.java +++ b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysTablesTests.java @@ -386,6 +386,6 @@ private void executeCommand(String sql, List params, Consume return Void.TYPE; }).when(resolver).resolveNames(any(), any(), any(), any()); - tuple.v1().execute(tuple.v2(), wrap(consumer::accept, ex -> fail(ex.getMessage()))); + tuple.v1().execute(tuple.v2(), wrap(p -> consumer.accept((SchemaRowSet) p.rowSet()), ex -> fail(ex.getMessage()))); } } diff --git a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysTypesTests.java b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysTypesTests.java index 805268dd5b687..6b7500cab661a 100644 --- a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysTypesTests.java +++ b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysTypesTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.xpack.sql.expression.function.FunctionRegistry; import org.elasticsearch.xpack.sql.parser.SqlParser; import org.elasticsearch.xpack.sql.plan.logical.command.Command; +import org.elasticsearch.xpack.sql.session.SchemaRowSet; import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.type.DataType; import org.elasticsearch.xpack.sql.type.TypesTests; @@ -50,7 +51,8 @@ public void testSysTypes() { "INTERVAL_HOUR_TO_MINUTE", "INTERVAL_HOUR_TO_SECOND", "INTERVAL_MINUTE_TO_SECOND", "GEO_SHAPE", "GEO_POINT", "UNSUPPORTED", "OBJECT", "NESTED"); - cmd.execute(null, wrap(r -> { + cmd.execute(session(), wrap(p -> { + SchemaRowSet r = (SchemaRowSet) p.rowSet(); assertEquals(19, r.columnCount()); assertEquals(DataType.values().length, r.size()); assertFalse(r.schema().types().contains(DataType.NULL)); @@ -72,7 +74,8 @@ public void testSysTypes() { public void testSysTypesDefaultFiltering() { Command cmd = sql("SYS TYPES 0").v1(); - cmd.execute(null, wrap(r -> { + cmd.execute(session(), wrap(p -> { + SchemaRowSet r = (SchemaRowSet) p.rowSet(); assertEquals(DataType.values().length, r.size()); }, ex -> fail(ex.getMessage()))); } @@ -81,7 +84,8 @@ public void testSysTypesPositiveFiltering() { // boolean = 16 Command cmd = sql("SYS TYPES " + JDBCType.BOOLEAN.getVendorTypeNumber()).v1(); - cmd.execute(null, wrap(r -> { + cmd.execute(session(), wrap(p -> { + SchemaRowSet r = (SchemaRowSet) p.rowSet(); assertEquals(1, r.size()); assertEquals("BOOLEAN", r.column(0)); }, ex -> fail(ex.getMessage()))); @@ -90,7 +94,8 @@ public void testSysTypesPositiveFiltering() { public void testSysTypesNegativeFiltering() { Command cmd = sql("SYS TYPES " + JDBCType.TINYINT.getVendorTypeNumber()).v1(); - cmd.execute(null, wrap(r -> { + cmd.execute(session(), wrap(p -> { + SchemaRowSet r = (SchemaRowSet) p.rowSet(); assertEquals(1, r.size()); assertEquals("BYTE", r.column(0)); }, ex -> fail(ex.getMessage()))); @@ -99,7 +104,8 @@ public void testSysTypesNegativeFiltering() { public void testSysTypesMultipleMatches() { Command cmd = sql("SYS TYPES " + JDBCType.VARCHAR.getVendorTypeNumber()).v1(); - cmd.execute(null, wrap(r -> { + cmd.execute(session(), wrap(p -> { + SchemaRowSet r = (SchemaRowSet) p.rowSet(); assertEquals(3, r.size()); assertEquals("KEYWORD", r.column(0)); assertTrue(r.advanceRow()); @@ -108,4 +114,8 @@ public void testSysTypesMultipleMatches() { assertEquals("IP", r.column(0)); }, ex -> fail(ex.getMessage()))); } + + private static SqlSession session() { + return new SqlSession(TestUtils.TEST_CFG, null, null, null, null, null, null, null, null); + } } diff --git a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/PagingListCursorTests.java b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/session/ListCursorTests.java similarity index 63% rename from x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/PagingListCursorTests.java rename to x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/session/ListCursorTests.java index c042b99ddf331..2eff1c5c5f320 100644 --- a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/PagingListCursorTests.java +++ b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/session/ListCursorTests.java @@ -3,21 +3,22 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.sql.execution.search; +package org.elasticsearch.xpack.sql.session; import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.Writeable.Reader; import org.elasticsearch.test.AbstractWireSerializingTestCase; import org.elasticsearch.xpack.sql.session.Cursors; +import org.elasticsearch.xpack.sql.session.ListCursor; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -public class PagingListCursorTests extends AbstractWireSerializingTestCase { - public static PagingListCursor randomPagingListCursor() { +public class ListCursorTests extends AbstractWireSerializingTestCase { + public static ListCursor randomPagingListCursor() { int size = between(1, 20); int depth = between(1, 20); @@ -26,14 +27,14 @@ public static PagingListCursor randomPagingListCursor() { values.add(Arrays.asList(randomArray(depth, s -> new Object[depth], () -> randomByte()))); } - return new PagingListCursor(values, depth, between(1, 20)); + return new ListCursor(values, between(1, 20), depth); } @Override - protected PagingListCursor mutateInstance(PagingListCursor instance) throws IOException { - return new PagingListCursor(instance.data(), - instance.columnCount(), - randomValueOtherThan(instance.pageSize(), () -> between(1, 20))); + protected ListCursor mutateInstance(ListCursor instance) throws IOException { + return new ListCursor(instance.data(), + randomValueOtherThan(instance.pageSize(), () -> between(1, 20)), + instance.columnCount()); } @Override @@ -42,22 +43,22 @@ protected NamedWriteableRegistry getNamedWriteableRegistry() { } @Override - protected PagingListCursor createTestInstance() { + protected ListCursor createTestInstance() { return randomPagingListCursor(); } @Override - protected Reader instanceReader() { - return PagingListCursor::new; + protected Reader instanceReader() { + return ListCursor::new; } @Override - protected PagingListCursor copyInstance(PagingListCursor instance, Version version) throws IOException { + protected ListCursor copyInstance(ListCursor instance, Version version) throws IOException { /* Randomly choose between internal protocol round trip and String based * round trips used to toXContent. */ if (randomBoolean()) { return super.copyInstance(instance, version); } - return (PagingListCursor) Cursors.decodeFromString(Cursors.encodeToString(version, instance)); + return (ListCursor) Cursors.decodeFromString(Cursors.encodeToString(version, instance)); } } \ No newline at end of file