Skip to content

Commit

Permalink
SQL: Fix issue with timezone when paginating (#52101)
Browse files Browse the repository at this point in the history
Previously, when the specified (or default) fetchSize led to
subsequent HTTP requests and the usage of cursors, those subsequent
were no longer using the client timezone specified in the initial
SQL query. As a consequence, Even though the query is executed once
(with the correct timezone) the processing of the query results by
the HitExtractors in the next pages was done using the default
timezone Z. This could lead to incorrect results.

Fix the issue by correctly using the initially specified timezone,
which is found in the deserialisation of the cursor string.

Fixes: #51258
(cherry picked from commit 8f7afbd)
  • Loading branch information
matriv committed Feb 11, 2020
1 parent b072ebd commit 3c8ea17
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Properties;

import static org.elasticsearch.xpack.sql.qa.jdbc.JdbcTestUtils.JDBC_TIMEZONE;
import static org.elasticsearch.xpack.sql.qa.rest.RestSqlTestCase.assertNoSearchContexts;

/**
Expand Down Expand Up @@ -102,6 +107,51 @@ public void testIncompleteScroll() throws Exception {
assertNoSearchContexts();
}

public void testScrollWithDatetimeAndTimezoneParam() throws IOException, SQLException {
Request request = new Request("PUT", "/test_date_timezone");
XContentBuilder createIndex = JsonXContent.contentBuilder().startObject();
createIndex.startObject("mappings");
{
createIndex.startObject("properties");
{
createIndex.startObject("date").field("type", "date").field("format", "epoch_millis");
createIndex.endObject();
}
createIndex.endObject();
}
createIndex.endObject().endObject();
request.setJsonEntity(Strings.toString(createIndex));
client().performRequest(request);

request = new Request("PUT", "/test_date_timezone/_bulk");
request.addParameter("refresh", "true");
StringBuilder bulk = new StringBuilder();
long[] datetimes = new long[] { 1_000, 10_000, 100_000, 1_000_000, 10_000_000 };
for (long datetime : datetimes) {
bulk.append("{\"index\":{}}\n");
bulk.append("{\"date\":").append(datetime).append("}\n");
}
request.setJsonEntity(bulk.toString());
assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode());

ZoneId zoneId = randomZone();
Properties connectionProperties = connectionProperties();
connectionProperties.put(JDBC_TIMEZONE, zoneId.toString());
try (Connection c = esJdbc(connectionProperties);
Statement s = c.createStatement()) {
s.setFetchSize(2);
try (ResultSet rs =
s.executeQuery("SELECT DATE_PART('TZOFFSET', date) FROM test_date_timezone ORDER BY date")) {
for (int i = 0; i < datetimes.length; i++) {
assertEquals(2, rs.getFetchSize());
assertTrue("No more entries left at " + i, rs.next());
assertEquals(ZonedDateTime.ofInstant(Instant.ofEpochMilli(datetimes[i]), zoneId).getOffset()
.getTotalSeconds()/ 60, rs.getInt(1));
}
assertFalse(rs.next());
}
}
}

/**
* Test for {@code SELECT} that is implemented as an aggregation.
Expand Down Expand Up @@ -237,4 +287,4 @@ private void addPivotData() throws Exception {
request.setJsonEntity(bulk.toString());
assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package org.elasticsearch.xpack.sql.qa.rest;

import com.fasterxml.jackson.core.io.JsonStringEncoder;

import org.apache.http.HttpEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
Expand All @@ -15,8 +14,10 @@
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.NotEqualMessageBuilder;
Expand All @@ -31,6 +32,9 @@
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.sql.JDBCType;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -151,6 +155,70 @@ public void testNextPage() throws IOException {
ContentType.APPLICATION_JSON), StringUtils.EMPTY, mode));
}

public void testNextPageWithDatetimeAndTimezoneParam() throws IOException {
Request request = new Request("PUT", "/test_date_timezone");
XContentBuilder createIndex = JsonXContent.contentBuilder().startObject();
createIndex.startObject("mappings");
{
createIndex.startObject("properties");
{
createIndex.startObject("date").field("type", "date").field("format", "epoch_millis");
createIndex.endObject();
}
createIndex.endObject();
}
createIndex.endObject().endObject();
request.setJsonEntity(Strings.toString(createIndex));
client().performRequest(request);

request = new Request("PUT", "/test_date_timezone/_bulk");
request.addParameter("refresh", "true");
StringBuilder bulk = new StringBuilder();
long[] datetimes = new long[] { 1_000, 10_000, 100_000, 1_000_000, 10_000_000 };
for (long datetime : datetimes) {
bulk.append("{\"index\":{}}\n");
bulk.append("{\"date\":").append(datetime).append("}\n");
}
request.setJsonEntity(bulk.toString());
assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode());

ZoneId zoneId = randomZone();
String mode = randomMode();
String sqlRequest =
"{\"query\":\"SELECT DATE_PART('TZOFFSET', date) AS tz FROM test_date_timezone ORDER BY date\","
+ "\"time_zone\":\"" + zoneId.getId() + "\", "
+ "\"mode\":\"" + mode + "\", "
+ "\"fetch_size\":2}";

String cursor = null;
for (int i = 0; i <= datetimes.length; i += 2) {
Map<String, Object> expected = new HashMap<>();
Map<String, Object> response;

if (i == 0) {
expected.put("columns", singletonList(columnInfo(mode, "tz", "integer", JDBCType.INTEGER, 11)));
response = runSql(new StringEntity(sqlRequest, ContentType.APPLICATION_JSON), "", mode);
} else {
response = runSql(new StringEntity("{\"cursor\":\"" + cursor + "\"" + mode(mode) + "}",
ContentType.APPLICATION_JSON), StringUtils.EMPTY, mode);
}

List<Object> values = new ArrayList<>(2);
for (int j = 0; j < (i < datetimes.length - 1 ? 2 : 1); j++) {
values.add(singletonList(ZonedDateTime.ofInstant(Instant.ofEpochMilli(datetimes[i + j]), zoneId)
.getOffset().getTotalSeconds() / 60));
}
expected.put("rows", values);
cursor = (String) response.remove("cursor");
assertResponse(expected, response);
assertNotNull(cursor);
}
Map<String, Object> expected = new HashMap<>();
expected.put("rows", emptyList());
assertResponse(expected, runSql(new StringEntity("{ \"cursor\":\"" + cursor + "\"" + mode(mode) + "}",
ContentType.APPLICATION_JSON), StringUtils.EMPTY, mode));
}

@AwaitsFix(bugUrl = "Unclear status, https://github.com/elastic/x-pack-elasticsearch/issues/2074")
public void testTimeZone() throws IOException {
String mode = randomMode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ protected void doExecute(Task task, SqlClearCursorRequest request, ActionListene

public static void operation(PlanExecutor planExecutor, SqlClearCursorRequest request,
ActionListener<SqlClearCursorResponse> listener) {
Cursor cursor = Cursors.decodeFromString(request.getCursor());
Cursor cursor = Cursors.decodeFromStringWithZone(request.getCursor()).v1();
planExecutor.cleanCursor(
new Configuration(DateUtils.UTC, Protocol.FETCH_SIZE, Protocol.REQUEST_TIMEOUT, Protocol.PAGE_TIMEOUT, null,
request.mode(), StringUtils.EMPTY, StringUtils.EMPTY, StringUtils.EMPTY, Protocol.FIELD_MULTI_VALUE_LENIENCY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
Expand All @@ -25,12 +26,14 @@
import org.elasticsearch.xpack.sql.proto.ColumnInfo;
import org.elasticsearch.xpack.sql.proto.Mode;
import org.elasticsearch.xpack.sql.session.Configuration;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.session.Cursor.Page;
import org.elasticsearch.xpack.sql.session.Cursors;
import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.session.SchemaRowSet;
import org.elasticsearch.xpack.sql.type.Schema;

import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;

Expand Down Expand Up @@ -67,7 +70,7 @@ protected void doExecute(Task task, SqlQueryRequest request, ActionListener<SqlQ
/**
* Actual implementation of the action. Statically available to support embedded mode.
*/
public static void operation(PlanExecutor planExecutor, SqlQueryRequest request, ActionListener<SqlQueryResponse> listener,
static void operation(PlanExecutor planExecutor, SqlQueryRequest request, ActionListener<SqlQueryResponse> listener,
String username, String clusterName) {
// The configuration is always created however when dealing with the next page, only the timeouts are relevant
// the rest having default values (since the query is already created)
Expand All @@ -79,13 +82,14 @@ public static void operation(PlanExecutor planExecutor, SqlQueryRequest request,
planExecutor.sql(cfg, request.query(), request.params(),
wrap(p -> listener.onResponse(createResponseWithSchema(request, p)), listener::onFailure));
} else {
planExecutor.nextPage(cfg, Cursors.decodeFromString(request.cursor()),
wrap(p -> listener.onResponse(createResponse(request, null, p)),
Tuple<Cursor, ZoneId> decoded = Cursors.decodeFromStringWithZone(request.cursor());
planExecutor.nextPage(cfg, decoded.v1(),
wrap(p -> listener.onResponse(createResponse(request, decoded.v2(), null, p)),
listener::onFailure));
}
}

static SqlQueryResponse createResponseWithSchema(SqlQueryRequest request, Page page) {
private static SqlQueryResponse createResponseWithSchema(SqlQueryRequest request, Page page) {
RowSet rset = page.rowSet();
if ((rset instanceof SchemaRowSet) == false) {
throw new SqlIllegalArgumentException("No schema found inside {}", rset.getClass());
Expand All @@ -101,10 +105,10 @@ static SqlQueryResponse createResponseWithSchema(SqlQueryRequest request, Page p
}
}
columns = unmodifiableList(columns);
return createResponse(request, columns, page);
return createResponse(request, request.zoneId(), columns, page);
}

static SqlQueryResponse createResponse(SqlQueryRequest request, List<ColumnInfo> header, Page page) {
private static SqlQueryResponse createResponse(SqlQueryRequest request, ZoneId zoneId, List<ColumnInfo> header, Page page) {
List<List<Object>> rows = new ArrayList<>();
page.rowSet().forEachRow(rowView -> {
List<Object> row = new ArrayList<>(rowView.columnCount());
Expand All @@ -113,7 +117,7 @@ static SqlQueryResponse createResponse(SqlQueryRequest request, List<ColumnInfo>
});

return new SqlQueryResponse(
Cursors.encodeToString(page.next(), request.zoneId()),
Cursors.encodeToString(page.next(), zoneId),
request.mode(),
request.columnar(),
header,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,6 @@ static String encodeToString(Cursor info, Version version, ZoneId zoneId) {
}
}


/**
* Read a {@linkplain Cursor} from a string.
*/
public static Cursor decodeFromString(String base64) {
return decodeFromStringWithZone(base64).v1();
}

/**
* Read a {@linkplain Cursor} from a string.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.xpack.sql.execution.search.extractor.ComputingExtractorTests;
import org.elasticsearch.xpack.sql.execution.search.extractor.ConstantExtractorTests;
import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractor;
import org.elasticsearch.xpack.sql.plugin.CursorTests;
import org.elasticsearch.xpack.sql.session.Cursors;

import java.io.IOException;
Expand Down Expand Up @@ -68,6 +69,6 @@ protected ScrollCursor copyInstance(ScrollCursor instance, Version version) thro
if (randomBoolean()) {
return super.copyInstance(instance, version);
}
return (ScrollCursor) Cursors.decodeFromString(Cursors.encodeToString(instance, randomZone()));
return (ScrollCursor) CursorTests.decodeFromString(Cursors.encodeToString(instance, randomZone()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,18 @@ static Cursor randomNonEmptyCursor() {

public void testVersionHandling() {
Cursor cursor = randomNonEmptyCursor();
assertEquals(cursor, Cursors.decodeFromString(Cursors.encodeToString(cursor, randomZone())));
assertEquals(cursor, decodeFromString(Cursors.encodeToString(cursor, randomZone())));

Version nextMinorVersion = Version.fromId(Version.CURRENT.id + 10000);

String encodedWithWrongVersion = CursorsTestUtil.encodeToString(cursor, nextMinorVersion, randomZone());
SqlException exception = expectThrows(SqlException.class, () -> Cursors.decodeFromString(encodedWithWrongVersion));
SqlException exception = expectThrows(SqlException.class, () -> decodeFromString(encodedWithWrongVersion));

assertEquals(LoggerMessageFormat.format("Unsupported cursor version [{}], expected [{}]", nextMinorVersion, Version.CURRENT),
exception.getMessage());
}
}

public static Cursor decodeFromString(String base64) {
return Cursors.decodeFromStringWithZone(base64).v1();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@

import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.test.AbstractWireTestCase;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.xpack.sql.AbstractSqlWireSerializingTestCase;
import org.elasticsearch.xpack.sql.plugin.CursorTests;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class ListCursorTests extends AbstractWireTestCase<ListCursor> {
public class ListCursorTests extends AbstractSqlWireSerializingTestCase<ListCursor> {
public static ListCursor randomPagingListCursor() {
int size = between(1, 20);
int depth = between(1, 20);
Expand Down Expand Up @@ -44,13 +46,18 @@ protected ListCursor createTestInstance() {
return randomPagingListCursor();
}

@Override
protected Writeable.Reader<ListCursor> instanceReader() {
return ListCursor::new;
}

@Override
protected ListCursor copyInstance(ListCursor instance, Version version) throws IOException {
/* Randomly choose between internal protocol round trip and String based
* round trips used to toXContent. */
if (randomBoolean()) {
return copyWriteable(instance, getNamedWriteableRegistry(), ListCursor::new, version);
}
return (ListCursor) Cursors.decodeFromString(Cursors.encodeToString(instance, randomZone()));
return (ListCursor) CursorTests.decodeFromString(Cursors.encodeToString(instance, randomZone()));
}
}
}

0 comments on commit 3c8ea17

Please sign in to comment.