Skip to content

Commit

Permalink
SQL: Refactor away the cycle between Rowset and Cursor (#45516)
Browse files Browse the repository at this point in the history
* SQL: Refactor away the cycle between RowSet and Cursor

Improve encapsulation of pagination of rowsets by breaking the cycle
between cursor and associated rowset implementation, all logic now
residing inside each cursor implementation.
  • Loading branch information
costin authored Aug 16, 2019
1 parent 3fef651 commit be8fe0a
Show file tree
Hide file tree
Showing 42 changed files with 377 additions and 352 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
import org.elasticsearch.xpack.sql.proto.SqlTypedParamValue;
import org.elasticsearch.xpack.sql.session.Configuration;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.session.SchemaRowSet;
import org.elasticsearch.xpack.sql.session.Cursor.Page;
import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.stats.Metrics;
import org.elasticsearch.xpack.sql.stats.QueryMetric;
Expand Down Expand Up @@ -91,7 +90,7 @@ public void searchSource(Configuration cfg, String sql, List<SqlTypedParamValue>
}, listener::onFailure));
}

public void sql(Configuration cfg, String sql, List<SqlTypedParamValue> params, ActionListener<SchemaRowSet> listener) {
public void sql(Configuration cfg, String sql, List<SqlTypedParamValue> params, ActionListener<Page> listener) {
QueryMetric metric = QueryMetric.from(cfg.mode(), cfg.clientId());
metrics.total(metric);

Expand All @@ -101,7 +100,7 @@ public void sql(Configuration cfg, String sql, List<SqlTypedParamValue> params,
}));
}

public void nextPage(Configuration cfg, Cursor cursor, ActionListener<RowSet> listener) {
public void nextPage(Configuration cfg, Cursor cursor, ActionListener<Page> listener) {
QueryMetric metric = QueryMetric.from(cfg.mode(), cfg.clientId());
metrics.total(metric);
metrics.paging(metric);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
import org.elasticsearch.xpack.sql.querydsl.agg.Aggs;
import org.elasticsearch.xpack.sql.session.Configuration;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.session.Rows;
import org.elasticsearch.xpack.sql.type.Schema;
import org.elasticsearch.xpack.sql.util.StringUtils;

import java.io.IOException;
Expand All @@ -36,6 +37,8 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;

/**
* Cursor for composite aggregation (GROUP BY).
Expand Down Expand Up @@ -116,7 +119,7 @@ boolean includeFrozen() {
}

@Override
public void nextPage(Configuration cfg, Client client, NamedWriteableRegistry registry, ActionListener<RowSet> listener) {
public void nextPage(Configuration cfg, Client client, NamedWriteableRegistry registry, ActionListener<Page> listener) {
SearchSourceBuilder q;
try {
q = deserializeQuery(registry, nextQuery);
Expand All @@ -135,21 +138,11 @@ public void nextPage(Configuration cfg, Client client, NamedWriteableRegistry re
client.search(search, new ActionListener<>() {
@Override
public void onResponse(SearchResponse r) {
try {
// retry
if (shouldRetryDueToEmptyPage(r)) {
CompositeAggregationCursor.updateCompositeAfterKey(r, search.source());
client.search(search, this);
return;
}

boolean hasAfterKey = updateCompositeAfterKey(r, query);
CompositeAggsRowSet rowSet = new CompositeAggsRowSet(extractors, mask, r, limit,
hasAfterKey ? serializeQuery(query) : null, includeFrozen, indices);
listener.onResponse(rowSet);
} catch (Exception ex) {
listener.onFailure(ex);
}
handle(r, search.source(), ba -> new CompositeAggsRowSet(extractors, mask, r, limit, ba),
() -> client.search(search, this),
p -> listener.onResponse(p),
e -> listener.onFailure(e),
Schema.EMPTY, includeFrozen, indices);
}

@Override
Expand All @@ -159,6 +152,39 @@ public void onFailure(Exception ex) {
});
}

static void handle(SearchResponse response, SearchSourceBuilder source, Function<byte[], CompositeAggsRowSet> makeRowSet,
Runnable retry, Consumer<Page> onPage, Consumer<Exception> onFailure,
Schema schema, boolean includeFrozen, String[] indices) {

// there are some results
if (response.getAggregations().asList().isEmpty() == false) {
// retry
if (CompositeAggregationCursor.shouldRetryDueToEmptyPage(response)) {
CompositeAggregationCursor.updateCompositeAfterKey(response, source);
retry.run();
return;
}

try {
boolean hasAfterKey = updateCompositeAfterKey(response, source);
byte[] queryAsBytes = hasAfterKey ? serializeQuery(source) : null;
CompositeAggsRowSet rowSet = makeRowSet.apply(queryAsBytes);

Cursor next = rowSet.remainingData() == 0
? Cursor.EMPTY
: new CompositeAggregationCursor(queryAsBytes, rowSet.extractors(), rowSet.mask(),
rowSet.remainingData(), includeFrozen, indices);
onPage.accept(new Page(rowSet, next));
} catch (Exception ex) {
onFailure.accept(ex);
}
}
// no results
else {
onPage.accept(Page.last(Rows.empty(schema)));
}
}

static boolean shouldRetryDueToEmptyPage(SearchResponse response) {
CompositeAggregation composite = getComposite(response);
// if there are no buckets but a next page, go fetch it instead of sending an empty response to the client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
import org.elasticsearch.xpack.sql.execution.search.extractor.BucketExtractor;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.session.RowSet;

import java.util.BitSet;
Expand All @@ -22,14 +21,11 @@
class CompositeAggsRowSet extends ResultRowSet<BucketExtractor> {

private final List<? extends CompositeAggregation.Bucket> buckets;

private final Cursor cursor;

private final int remainingData;
private final int size;
private int row = 0;

CompositeAggsRowSet(List<BucketExtractor> exts, BitSet mask, SearchResponse response,
int limit, byte[] next, boolean includeFrozen, String... indices) {
CompositeAggsRowSet(List<BucketExtractor> exts, BitSet mask, SearchResponse response, int limit, byte[] next) {
super(exts, mask);

CompositeAggregation composite = CompositeAggregationCursor.getComposite(response);
Expand All @@ -43,7 +39,7 @@ class CompositeAggsRowSet extends ResultRowSet<BucketExtractor> {
size = limit == -1 ? buckets.size() : Math.min(buckets.size(), limit);

if (next == null) {
cursor = Cursor.EMPTY;
remainingData = 0;
} else {
// Compute remaining limit

Expand All @@ -56,9 +52,9 @@ class CompositeAggsRowSet extends ResultRowSet<BucketExtractor> {
// however the Querier takes care of that and keeps making requests until either the query is invalid or at least one response
// is returned.
if (size == 0 || remainingLimit == 0) {
cursor = Cursor.EMPTY;
remainingData = 0;
} else {
cursor = new CompositeAggregationCursor(next, exts, mask, remainingLimit, includeFrozen, indices);
remainingData = remainingLimit;
}
}
}
Expand Down Expand Up @@ -92,8 +88,7 @@ public int size() {
return size;
}

@Override
public Cursor nextPageCursor() {
return cursor;
int remainingData() {
return remainingData;
}
}
}

This file was deleted.

Loading

0 comments on commit be8fe0a

Please sign in to comment.