From 3c8ea17e0bc92352e83d3afee294064be21eab08 Mon Sep 17 00:00:00 2001 From: Marios Trivyzas Date: Tue, 11 Feb 2020 14:59:06 +0100 Subject: [PATCH] SQL: Fix issue with timezone when paginating (#52101) Previously, when the specified (or default) fetchSize led to subsequent HTTP requests and the usage of cursors, those subsequent were no longer using the client timezone specified in the initial SQL query. As a consequence, Even though the query is executed once (with the correct timezone) the processing of the query results by the HitExtractors in the next pages was done using the default timezone Z. This could lead to incorrect results. Fix the issue by correctly using the initially specified timezone, which is found in the deserialisation of the cursor string. Fixes: #51258 (cherry picked from commit 8f7afbdeb9295999b48a6c36db5b31cbe0cee432) --- .../xpack/sql/qa/jdbc/FetchSizeTestCase.java | 52 +++++++++++++- .../xpack/sql/qa/rest/RestSqlTestCase.java | 70 ++++++++++++++++++- .../plugin/TransportSqlClearCursorAction.java | 2 +- .../sql/plugin/TransportSqlQueryAction.java | 18 +++-- .../xpack/sql/session/Cursors.java | 8 --- .../execution/search/ScrollCursorTests.java | 3 +- .../xpack/sql/plugin/CursorTests.java | 10 ++- .../xpack/sql/session/ListCursorTests.java | 15 ++-- 8 files changed, 152 insertions(+), 26 deletions(-) diff --git a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/FetchSizeTestCase.java b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/FetchSizeTestCase.java index 61cd6e93c1831..4d1d55c1b3771 100644 --- a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/FetchSizeTestCase.java +++ b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/FetchSizeTestCase.java @@ -16,7 +16,12 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.Properties; +import static org.elasticsearch.xpack.sql.qa.jdbc.JdbcTestUtils.JDBC_TIMEZONE; import static org.elasticsearch.xpack.sql.qa.rest.RestSqlTestCase.assertNoSearchContexts; /** @@ -102,6 +107,51 @@ public void testIncompleteScroll() throws Exception { assertNoSearchContexts(); } + public void testScrollWithDatetimeAndTimezoneParam() throws IOException, SQLException { + Request request = new Request("PUT", "/test_date_timezone"); + XContentBuilder createIndex = JsonXContent.contentBuilder().startObject(); + createIndex.startObject("mappings"); + { + createIndex.startObject("properties"); + { + createIndex.startObject("date").field("type", "date").field("format", "epoch_millis"); + createIndex.endObject(); + } + createIndex.endObject(); + } + createIndex.endObject().endObject(); + request.setJsonEntity(Strings.toString(createIndex)); + client().performRequest(request); + + request = new Request("PUT", "/test_date_timezone/_bulk"); + request.addParameter("refresh", "true"); + StringBuilder bulk = new StringBuilder(); + long[] datetimes = new long[] { 1_000, 10_000, 100_000, 1_000_000, 10_000_000 }; + for (long datetime : datetimes) { + bulk.append("{\"index\":{}}\n"); + bulk.append("{\"date\":").append(datetime).append("}\n"); + } + request.setJsonEntity(bulk.toString()); + assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode()); + + ZoneId zoneId = randomZone(); + Properties connectionProperties = connectionProperties(); + connectionProperties.put(JDBC_TIMEZONE, zoneId.toString()); + try (Connection c = esJdbc(connectionProperties); + Statement s = c.createStatement()) { + s.setFetchSize(2); + try (ResultSet rs = + s.executeQuery("SELECT DATE_PART('TZOFFSET', date) FROM test_date_timezone ORDER BY date")) { + for (int i = 0; i < datetimes.length; i++) { + assertEquals(2, rs.getFetchSize()); + assertTrue("No more entries left at " + i, rs.next()); + assertEquals(ZonedDateTime.ofInstant(Instant.ofEpochMilli(datetimes[i]), zoneId).getOffset() + .getTotalSeconds()/ 60, rs.getInt(1)); + } + assertFalse(rs.next()); + } + } + } /** * Test for {@code SELECT} that is implemented as an aggregation. @@ -237,4 +287,4 @@ private void addPivotData() throws Exception { request.setJsonEntity(bulk.toString()); assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode()); } -} \ No newline at end of file +} diff --git a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java index 46c5bd9d406c8..c8c38ee1adcae 100644 --- a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java +++ b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.sql.qa.rest; import com.fasterxml.jackson.core.io.JsonStringEncoder; - import org.apache.http.HttpEntity; import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; @@ -15,8 +14,10 @@ import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; import org.elasticsearch.common.CheckedSupplier; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.test.NotEqualMessageBuilder; @@ -31,6 +32,9 @@ import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; import java.sql.JDBCType; +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -151,6 +155,70 @@ public void testNextPage() throws IOException { ContentType.APPLICATION_JSON), StringUtils.EMPTY, mode)); } + public void testNextPageWithDatetimeAndTimezoneParam() throws IOException { + Request request = new Request("PUT", "/test_date_timezone"); + XContentBuilder createIndex = JsonXContent.contentBuilder().startObject(); + createIndex.startObject("mappings"); + { + createIndex.startObject("properties"); + { + createIndex.startObject("date").field("type", "date").field("format", "epoch_millis"); + createIndex.endObject(); + } + createIndex.endObject(); + } + createIndex.endObject().endObject(); + request.setJsonEntity(Strings.toString(createIndex)); + client().performRequest(request); + + request = new Request("PUT", "/test_date_timezone/_bulk"); + request.addParameter("refresh", "true"); + StringBuilder bulk = new StringBuilder(); + long[] datetimes = new long[] { 1_000, 10_000, 100_000, 1_000_000, 10_000_000 }; + for (long datetime : datetimes) { + bulk.append("{\"index\":{}}\n"); + bulk.append("{\"date\":").append(datetime).append("}\n"); + } + request.setJsonEntity(bulk.toString()); + assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode()); + + ZoneId zoneId = randomZone(); + String mode = randomMode(); + String sqlRequest = + "{\"query\":\"SELECT DATE_PART('TZOFFSET', date) AS tz FROM test_date_timezone ORDER BY date\"," + + "\"time_zone\":\"" + zoneId.getId() + "\", " + + "\"mode\":\"" + mode + "\", " + + "\"fetch_size\":2}"; + + String cursor = null; + for (int i = 0; i <= datetimes.length; i += 2) { + Map expected = new HashMap<>(); + Map response; + + if (i == 0) { + expected.put("columns", singletonList(columnInfo(mode, "tz", "integer", JDBCType.INTEGER, 11))); + response = runSql(new StringEntity(sqlRequest, ContentType.APPLICATION_JSON), "", mode); + } else { + response = runSql(new StringEntity("{\"cursor\":\"" + cursor + "\"" + mode(mode) + "}", + ContentType.APPLICATION_JSON), StringUtils.EMPTY, mode); + } + + List values = new ArrayList<>(2); + for (int j = 0; j < (i < datetimes.length - 1 ? 2 : 1); j++) { + values.add(singletonList(ZonedDateTime.ofInstant(Instant.ofEpochMilli(datetimes[i + j]), zoneId) + .getOffset().getTotalSeconds() / 60)); + } + expected.put("rows", values); + cursor = (String) response.remove("cursor"); + assertResponse(expected, response); + assertNotNull(cursor); + } + Map expected = new HashMap<>(); + expected.put("rows", emptyList()); + assertResponse(expected, runSql(new StringEntity("{ \"cursor\":\"" + cursor + "\"" + mode(mode) + "}", + ContentType.APPLICATION_JSON), StringUtils.EMPTY, mode)); + } + @AwaitsFix(bugUrl = "Unclear status, https://github.com/elastic/x-pack-elasticsearch/issues/2074") public void testTimeZone() throws IOException { String mode = randomMode(); diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlClearCursorAction.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlClearCursorAction.java index 6d3802c341f1d..b6ffb45a36198 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlClearCursorAction.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlClearCursorAction.java @@ -43,7 +43,7 @@ protected void doExecute(Task task, SqlClearCursorRequest request, ActionListene public static void operation(PlanExecutor planExecutor, SqlClearCursorRequest request, ActionListener listener) { - Cursor cursor = Cursors.decodeFromString(request.getCursor()); + Cursor cursor = Cursors.decodeFromStringWithZone(request.getCursor()).v1(); planExecutor.cleanCursor( new Configuration(DateUtils.UTC, Protocol.FETCH_SIZE, Protocol.REQUEST_TIMEOUT, Protocol.PAGE_TIMEOUT, null, request.mode(), StringUtils.EMPTY, StringUtils.EMPTY, StringUtils.EMPTY, Protocol.FIELD_MULTI_VALUE_LENIENCY, diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlQueryAction.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlQueryAction.java index 97da20902a0d3..52bb1f297cb2c 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlQueryAction.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlQueryAction.java @@ -10,6 +10,7 @@ import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.tasks.Task; @@ -25,12 +26,14 @@ import org.elasticsearch.xpack.sql.proto.ColumnInfo; import org.elasticsearch.xpack.sql.proto.Mode; import org.elasticsearch.xpack.sql.session.Configuration; +import org.elasticsearch.xpack.sql.session.Cursor; import org.elasticsearch.xpack.sql.session.Cursor.Page; import org.elasticsearch.xpack.sql.session.Cursors; import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.SchemaRowSet; import org.elasticsearch.xpack.sql.type.Schema; +import java.time.ZoneId; import java.util.ArrayList; import java.util.List; @@ -67,7 +70,7 @@ protected void doExecute(Task task, SqlQueryRequest request, ActionListener listener, + static void operation(PlanExecutor planExecutor, SqlQueryRequest request, ActionListener listener, String username, String clusterName) { // The configuration is always created however when dealing with the next page, only the timeouts are relevant // the rest having default values (since the query is already created) @@ -79,13 +82,14 @@ public static void operation(PlanExecutor planExecutor, SqlQueryRequest request, planExecutor.sql(cfg, request.query(), request.params(), wrap(p -> listener.onResponse(createResponseWithSchema(request, p)), listener::onFailure)); } else { - planExecutor.nextPage(cfg, Cursors.decodeFromString(request.cursor()), - wrap(p -> listener.onResponse(createResponse(request, null, p)), + Tuple decoded = Cursors.decodeFromStringWithZone(request.cursor()); + planExecutor.nextPage(cfg, decoded.v1(), + wrap(p -> listener.onResponse(createResponse(request, decoded.v2(), null, p)), listener::onFailure)); } } - static SqlQueryResponse createResponseWithSchema(SqlQueryRequest request, Page page) { + private static SqlQueryResponse createResponseWithSchema(SqlQueryRequest request, Page page) { RowSet rset = page.rowSet(); if ((rset instanceof SchemaRowSet) == false) { throw new SqlIllegalArgumentException("No schema found inside {}", rset.getClass()); @@ -101,10 +105,10 @@ static SqlQueryResponse createResponseWithSchema(SqlQueryRequest request, Page p } } columns = unmodifiableList(columns); - return createResponse(request, columns, page); + return createResponse(request, request.zoneId(), columns, page); } - static SqlQueryResponse createResponse(SqlQueryRequest request, List header, Page page) { + private static SqlQueryResponse createResponse(SqlQueryRequest request, ZoneId zoneId, List header, Page page) { List> rows = new ArrayList<>(); page.rowSet().forEachRow(rowView -> { List row = new ArrayList<>(rowView.columnCount()); @@ -113,7 +117,7 @@ static SqlQueryResponse createResponse(SqlQueryRequest request, List }); return new SqlQueryResponse( - Cursors.encodeToString(page.next(), request.zoneId()), + Cursors.encodeToString(page.next(), zoneId), request.mode(), request.columnar(), header, diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Cursors.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Cursors.java index 6f1ee47f4da34..3282f0bb99641 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Cursors.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Cursors.java @@ -83,14 +83,6 @@ static String encodeToString(Cursor info, Version version, ZoneId zoneId) { } } - - /** - * Read a {@linkplain Cursor} from a string. - */ - public static Cursor decodeFromString(String base64) { - return decodeFromStringWithZone(base64).v1(); - } - /** * Read a {@linkplain Cursor} from a string. */ diff --git a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursorTests.java b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursorTests.java index ca135d5170fd4..26d9679157eac 100644 --- a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursorTests.java +++ b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursorTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.xpack.sql.execution.search.extractor.ComputingExtractorTests; import org.elasticsearch.xpack.sql.execution.search.extractor.ConstantExtractorTests; import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractor; +import org.elasticsearch.xpack.sql.plugin.CursorTests; import org.elasticsearch.xpack.sql.session.Cursors; import java.io.IOException; @@ -68,6 +69,6 @@ protected ScrollCursor copyInstance(ScrollCursor instance, Version version) thro if (randomBoolean()) { return super.copyInstance(instance, version); } - return (ScrollCursor) Cursors.decodeFromString(Cursors.encodeToString(instance, randomZone())); + return (ScrollCursor) CursorTests.decodeFromString(Cursors.encodeToString(instance, randomZone())); } } diff --git a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plugin/CursorTests.java b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plugin/CursorTests.java index 28d9c278ed163..0523af135686e 100644 --- a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plugin/CursorTests.java +++ b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plugin/CursorTests.java @@ -104,14 +104,18 @@ static Cursor randomNonEmptyCursor() { public void testVersionHandling() { Cursor cursor = randomNonEmptyCursor(); - assertEquals(cursor, Cursors.decodeFromString(Cursors.encodeToString(cursor, randomZone()))); + assertEquals(cursor, decodeFromString(Cursors.encodeToString(cursor, randomZone()))); Version nextMinorVersion = Version.fromId(Version.CURRENT.id + 10000); String encodedWithWrongVersion = CursorsTestUtil.encodeToString(cursor, nextMinorVersion, randomZone()); - SqlException exception = expectThrows(SqlException.class, () -> Cursors.decodeFromString(encodedWithWrongVersion)); + SqlException exception = expectThrows(SqlException.class, () -> decodeFromString(encodedWithWrongVersion)); assertEquals(LoggerMessageFormat.format("Unsupported cursor version [{}], expected [{}]", nextMinorVersion, Version.CURRENT), exception.getMessage()); } -} \ No newline at end of file + + public static Cursor decodeFromString(String base64) { + return Cursors.decodeFromStringWithZone(base64).v1(); + } +} diff --git a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/session/ListCursorTests.java b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/session/ListCursorTests.java index 2fd88e3c0bc8c..bf3fc548ef6e7 100644 --- a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/session/ListCursorTests.java +++ b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/session/ListCursorTests.java @@ -7,14 +7,16 @@ import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.test.AbstractWireTestCase; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.xpack.sql.AbstractSqlWireSerializingTestCase; +import org.elasticsearch.xpack.sql.plugin.CursorTests; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -public class ListCursorTests extends AbstractWireTestCase { +public class ListCursorTests extends AbstractSqlWireSerializingTestCase { public static ListCursor randomPagingListCursor() { int size = between(1, 20); int depth = between(1, 20); @@ -44,6 +46,11 @@ protected ListCursor createTestInstance() { return randomPagingListCursor(); } + @Override + protected Writeable.Reader instanceReader() { + return ListCursor::new; + } + @Override protected ListCursor copyInstance(ListCursor instance, Version version) throws IOException { /* Randomly choose between internal protocol round trip and String based @@ -51,6 +58,6 @@ protected ListCursor copyInstance(ListCursor instance, Version version) throws I if (randomBoolean()) { return copyWriteable(instance, getNamedWriteableRegistry(), ListCursor::new, version); } - return (ListCursor) Cursors.decodeFromString(Cursors.encodeToString(instance, randomZone())); + return (ListCursor) CursorTests.decodeFromString(Cursors.encodeToString(instance, randomZone())); } -} \ No newline at end of file +}