Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQL: Fix issue with timezone when paginating #52101

Merged
merged 5 commits into from
Feb 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Properties;

import static org.elasticsearch.xpack.sql.qa.jdbc.JdbcTestUtils.JDBC_TIMEZONE;
import static org.elasticsearch.xpack.sql.qa.rest.RestSqlTestCase.assertNoSearchContexts;

/**
Expand Down Expand Up @@ -102,6 +107,51 @@ public void testIncompleteScroll() throws Exception {
assertNoSearchContexts();
}

public void testScrollWithDatetimeAndTimezoneParam() throws IOException, SQLException {
Request request = new Request("PUT", "/test_date_timezone");
XContentBuilder createIndex = JsonXContent.contentBuilder().startObject();
createIndex.startObject("mappings");
{
createIndex.startObject("properties");
{
createIndex.startObject("date").field("type", "date").field("format", "epoch_millis");
createIndex.endObject();
}
createIndex.endObject();
}
createIndex.endObject().endObject();
request.setJsonEntity(Strings.toString(createIndex));
client().performRequest(request);

request = new Request("PUT", "/test_date_timezone/_bulk");
request.addParameter("refresh", "true");
StringBuilder bulk = new StringBuilder();
long[] datetimes = new long[] { 1_000, 10_000, 100_000, 1_000_000, 10_000_000 };
for (long datetime : datetimes) {
bulk.append("{\"index\":{}}\n");
bulk.append("{\"date\":").append(datetime).append("}\n");
}
request.setJsonEntity(bulk.toString());
assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode());

ZoneId zoneId = randomZone();
Properties connectionProperties = connectionProperties();
connectionProperties.put(JDBC_TIMEZONE, zoneId.toString());
try (Connection c = esJdbc(connectionProperties);
Statement s = c.createStatement()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move this line on the above one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A(n appropriate) formatter should help in these style-adjusting cases.

s.setFetchSize(2);
try (ResultSet rs =
s.executeQuery("SELECT DATE_PART('TZOFFSET', date) FROM test_date_timezone ORDER BY date")) {
for (int i = 0; i < datetimes.length; i++) {
assertEquals(2, rs.getFetchSize());
assertTrue("No more entries left at " + i, rs.next());
assertEquals(ZonedDateTime.ofInstant(Instant.ofEpochMilli(datetimes[i]), zoneId).getOffset()
.getTotalSeconds()/ 60, rs.getInt(1));
}
assertFalse(rs.next());
}
}
}

/**
* Test for {@code SELECT} that is implemented as an aggregation.
Expand Down Expand Up @@ -237,4 +287,4 @@ private void addPivotData() throws Exception {
request.setJsonEntity(bulk.toString());
assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package org.elasticsearch.xpack.sql.qa.rest;

import com.fasterxml.jackson.core.io.JsonStringEncoder;

import org.apache.http.HttpEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
Expand All @@ -15,9 +14,11 @@
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.NotEqualMessageBuilder;
Expand All @@ -31,6 +32,9 @@
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.sql.JDBCType;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -151,6 +155,70 @@ public void testNextPage() throws IOException {
ContentType.APPLICATION_JSON), StringUtils.EMPTY, mode));
}

public void testNextPageWithDatetimeAndTimezoneParam() throws IOException {
Request request = new Request("PUT", "/test_date_timezone");
XContentBuilder createIndex = JsonXContent.contentBuilder().startObject();
createIndex.startObject("mappings");
{
createIndex.startObject("properties");
{
createIndex.startObject("date").field("type", "date").field("format", "epoch_millis");
createIndex.endObject();
}
createIndex.endObject();
}
createIndex.endObject().endObject();
request.setJsonEntity(Strings.toString(createIndex));
client().performRequest(request);

request = new Request("PUT", "/test_date_timezone/_bulk");
request.addParameter("refresh", "true");
StringBuilder bulk = new StringBuilder();
long[] datetimes = new long[] { 1_000, 10_000, 100_000, 1_000_000, 10_000_000 };
for (long datetime : datetimes) {
bulk.append("{\"index\":{}}\n");
bulk.append("{\"date\":").append(datetime).append("}\n");
}
request.setJsonEntity(bulk.toString());
assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode());

ZoneId zoneId = randomZone();
String mode = randomMode();
String sqlRequest =
"{\"query\":\"SELECT DATE_PART('TZOFFSET', date) AS tz FROM test_date_timezone ORDER BY date\","
+ "\"time_zone\":\"" + zoneId.getId() + "\", "
+ "\"mode\":\"" + mode + "\", "
+ "\"fetch_size\":2}";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's perfectly fine as is, but was curious about the reason - if any - for choosing a fetch_size of 2, vs. 1, which would simplify the test just a bit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fetch size 2 and odd number of rows ('5') makes the last page to return 1 row.

Copy link
Contributor

@bpintea bpintea Feb 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, there'll be pages with 1 and 2 rows as the j-based for makes it obvious.
But I was only curious to understand why is that desired in respect to what the test checks (i.e. all rows have the same timezone offset). Sorry if missing anything obvious. :-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't have to do with the bug fixed, just another safety testing that the fetch size behavior is correct.


String cursor = null;
for (int i = 0; i <= datetimes.length; i += 2) {
Map<String, Object> expected = new HashMap<>();
Map<String, Object> response;

if (i == 0) {
expected.put("columns", singletonList(columnInfo(mode, "tz", "integer", JDBCType.INTEGER, 11)));
response = runSql(new StringEntity(sqlRequest, ContentType.APPLICATION_JSON), "", mode);
} else {
response = runSql(new StringEntity("{\"cursor\":\"" + cursor + "\"" + mode(mode) + "}",
ContentType.APPLICATION_JSON), StringUtils.EMPTY, mode);
}

List<Object> values = new ArrayList<>(2);
for (int j = 0; j < (i < datetimes.length - 1 ? 2 : 1); j++) {
values.add(singletonList(ZonedDateTime.ofInstant(Instant.ofEpochMilli(datetimes[i + j]), zoneId)
.getOffset().getTotalSeconds() / 60));
}
expected.put("rows", values);
cursor = (String) response.remove("cursor");
assertResponse(expected, response);
assertNotNull(cursor);
}
Map<String, Object> expected = new HashMap<>();
expected.put("rows", emptyList());
assertResponse(expected, runSql(new StringEntity("{ \"cursor\":\"" + cursor + "\"" + mode(mode) + "}",
ContentType.APPLICATION_JSON), StringUtils.EMPTY, mode));
}

@AwaitsFix(bugUrl = "Unclear status, https://github.com/elastic/x-pack-elasticsearch/issues/2074")
public void testTimeZone() throws IOException {
String mode = randomMode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ protected void doExecute(Task task, SqlClearCursorRequest request, ActionListene

public static void operation(PlanExecutor planExecutor, SqlClearCursorRequest request,
ActionListener<SqlClearCursorResponse> listener) {
Cursor cursor = Cursors.decodeFromString(request.getCursor());
Cursor cursor = Cursors.decodeFromStringWithZone(request.getCursor()).v1();
planExecutor.cleanCursor(
new Configuration(DateUtils.UTC, Protocol.FETCH_SIZE, Protocol.REQUEST_TIMEOUT, Protocol.PAGE_TIMEOUT, null,
request.mode(), StringUtils.EMPTY, StringUtils.EMPTY, StringUtils.EMPTY, Protocol.FIELD_MULTI_VALUE_LENIENCY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
Expand All @@ -26,12 +27,14 @@
import org.elasticsearch.xpack.sql.proto.ColumnInfo;
import org.elasticsearch.xpack.sql.proto.Mode;
import org.elasticsearch.xpack.sql.session.Configuration;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.session.Cursor.Page;
import org.elasticsearch.xpack.sql.session.Cursors;
import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.session.SchemaRowSet;
import org.elasticsearch.xpack.sql.type.SqlDataTypes;

import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;

Expand Down Expand Up @@ -68,7 +71,7 @@ protected void doExecute(Task task, SqlQueryRequest request, ActionListener<SqlQ
/**
* Actual implementation of the action. Statically available to support embedded mode.
*/
public static void operation(PlanExecutor planExecutor, SqlQueryRequest request, ActionListener<SqlQueryResponse> listener,
static void operation(PlanExecutor planExecutor, SqlQueryRequest request, ActionListener<SqlQueryResponse> listener,
String username, String clusterName) {
// The configuration is always created however when dealing with the next page, only the timeouts are relevant
// the rest having default values (since the query is already created)
Expand All @@ -80,13 +83,14 @@ public static void operation(PlanExecutor planExecutor, SqlQueryRequest request,
planExecutor.sql(cfg, request.query(), request.params(),
wrap(p -> listener.onResponse(createResponseWithSchema(request, p)), listener::onFailure));
} else {
planExecutor.nextPage(cfg, Cursors.decodeFromString(request.cursor()),
wrap(p -> listener.onResponse(createResponse(request, null, p)),
Tuple<Cursor, ZoneId> decoded = Cursors.decodeFromStringWithZone(request.cursor());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is Cursors.decodeFromString used anywhere else ? It might be that decoding a tuple of timezone or adding it as a property in the Cursor class should be the default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm planning to do that in an upcoming PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ad discussed, will not pass the zoneId into the Cursor, as zoneId is the responsibility of SqlInput/OutputStreams to handle. Instead, I remove the decodeFromString and only use the decodeFromStringWithZone, so it's obvious for consumers that the zoneId is always decoded as well and returned as part of the returned Tuple.

planExecutor.nextPage(cfg, decoded.v1(),
wrap(p -> listener.onResponse(createResponse(request, decoded.v2(), null, p)),
listener::onFailure));
}
}

static SqlQueryResponse createResponseWithSchema(SqlQueryRequest request, Page page) {
private static SqlQueryResponse createResponseWithSchema(SqlQueryRequest request, Page page) {
RowSet rset = page.rowSet();
if ((rset instanceof SchemaRowSet) == false) {
throw new SqlIllegalArgumentException("No schema found inside {}", rset.getClass());
Expand All @@ -102,10 +106,10 @@ static SqlQueryResponse createResponseWithSchema(SqlQueryRequest request, Page p
}
}
columns = unmodifiableList(columns);
return createResponse(request, columns, page);
return createResponse(request, request.zoneId(), columns, page);
}

static SqlQueryResponse createResponse(SqlQueryRequest request, List<ColumnInfo> header, Page page) {
private static SqlQueryResponse createResponse(SqlQueryRequest request, ZoneId zoneId, List<ColumnInfo> header, Page page) {
List<List<Object>> rows = new ArrayList<>();
page.rowSet().forEachRow(rowView -> {
List<Object> row = new ArrayList<>(rowView.columnCount());
Expand All @@ -114,7 +118,7 @@ static SqlQueryResponse createResponse(SqlQueryRequest request, List<ColumnInfo>
});

return new SqlQueryResponse(
Cursors.encodeToString(page.next(), request.zoneId()),
Cursors.encodeToString(page.next(), zoneId),
request.mode(),
request.columnar(),
header,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,6 @@ static String encodeToString(Cursor info, Version version, ZoneId zoneId) {
}
}


/**
* Read a {@linkplain Cursor} from a string.
*/
public static Cursor decodeFromString(String base64) {
return decodeFromStringWithZone(base64).v1();
}

/**
* Read a {@linkplain Cursor} from a string.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.xpack.ql.execution.search.extractor.HitExtractor;
import org.elasticsearch.xpack.sql.AbstractSqlWireSerializingTestCase;
import org.elasticsearch.xpack.sql.execution.search.extractor.ComputingExtractorTests;
import org.elasticsearch.xpack.sql.plugin.CursorTests;
import org.elasticsearch.xpack.sql.session.Cursors;

import java.io.IOException;
Expand Down Expand Up @@ -68,6 +69,6 @@ protected ScrollCursor copyInstance(ScrollCursor instance, Version version) thro
if (randomBoolean()) {
return super.copyInstance(instance, version);
}
return (ScrollCursor) Cursors.decodeFromString(Cursors.encodeToString(instance, randomZone()));
return (ScrollCursor) CursorTests.decodeFromString(Cursors.encodeToString(instance, randomZone()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,19 @@ static Cursor randomNonEmptyCursor() {

public void testVersionHandling() {
Cursor cursor = randomNonEmptyCursor();
assertEquals(cursor, Cursors.decodeFromString(Cursors.encodeToString(cursor, randomZone())));
assertEquals(cursor, decodeFromString(Cursors.encodeToString(cursor, randomZone())));

Version nextMinorVersion = Version.fromId(Version.CURRENT.id + 10000);

String encodedWithWrongVersion = CursorsTestUtil.encodeToString(cursor, nextMinorVersion, randomZone());
SqlIllegalArgumentException exception = expectThrows(SqlIllegalArgumentException.class,
() -> Cursors.decodeFromString(encodedWithWrongVersion));
() -> decodeFromString(encodedWithWrongVersion));

assertEquals(LoggerMessageFormat.format("Unsupported cursor version [{}], expected [{}]", nextMinorVersion, Version.CURRENT),
exception.getMessage());
}
}

public static Cursor decodeFromString(String base64) {
return Cursors.decodeFromStringWithZone(base64).v1();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@

import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.test.AbstractWireTestCase;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.xpack.sql.AbstractSqlWireSerializingTestCase;
import org.elasticsearch.xpack.sql.plugin.CursorTests;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class ListCursorTests extends AbstractWireTestCase<ListCursor> {
public class ListCursorTests extends AbstractSqlWireSerializingTestCase<ListCursor> {
public static ListCursor randomPagingListCursor() {
int size = between(1, 20);
int depth = between(1, 20);
Expand Down Expand Up @@ -44,13 +46,18 @@ protected ListCursor createTestInstance() {
return randomPagingListCursor();
}

@Override
protected Writeable.Reader<ListCursor> instanceReader() {
return ListCursor::new;
}

@Override
protected ListCursor copyInstance(ListCursor instance, Version version) throws IOException {
/* Randomly choose between internal protocol round trip and String based
* round trips used to toXContent. */
if (randomBoolean()) {
return copyWriteable(instance, getNamedWriteableRegistry(), ListCursor::new, version);
}
return (ListCursor) Cursors.decodeFromString(Cursors.encodeToString(instance, randomZone()));
return (ListCursor) CursorTests.decodeFromString(Cursors.encodeToString(instance, randomZone()));
}
}
}