From ae4c71a993d24e9994b861a8ebf98aee0efbeca2 Mon Sep 17 00:00:00 2001 From: "Mateusz \"Serafin\" Gajewski" Date: Wed, 25 Sep 2024 22:03:01 +0200 Subject: [PATCH 1/7] Remove unused class --- .../encoding/JsonQueryDataDecoder.java | 27 ------------------- 1 file changed, 27 deletions(-) diff --git a/client/trino-client/src/main/java/io/trino/client/spooling/encoding/JsonQueryDataDecoder.java b/client/trino-client/src/main/java/io/trino/client/spooling/encoding/JsonQueryDataDecoder.java index 358ef396b86c..bd282b7c539c 100644 --- a/client/trino-client/src/main/java/io/trino/client/spooling/encoding/JsonQueryDataDecoder.java +++ b/client/trino-client/src/main/java/io/trino/client/spooling/encoding/JsonQueryDataDecoder.java @@ -13,8 +13,6 @@ */ package io.trino.client.spooling.encoding; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import io.trino.client.Column; @@ -105,29 +103,4 @@ public String encoding() return super.encoding() + "+lz4"; } } - - public static class JsonSchema - { - private final int[] offsets; - private final int step; - - @JsonCreator - public JsonSchema(int[] offsets, int step) - { - this.offsets = offsets; - this.step = step; - } - - @JsonProperty("offsets") - public int[] getOffsets() - { - return offsets; - } - - @JsonProperty("step") - public int getStep() - { - return step; - } - } } From 556426bbc469b97f11d9b2d710223ba7048d2bf9 Mon Sep 17 00:00:00 2001 From: "Mateusz \"Serafin\" Gajewski" Date: Wed, 25 Sep 2024 22:09:44 +0200 Subject: [PATCH 2/7] Decompress Zstd in a single pass --- .../encoding/ZstdQueryDataDecoder.java | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/client/trino-client/src/main/java/io/trino/client/spooling/encoding/ZstdQueryDataDecoder.java b/client/trino-client/src/main/java/io/trino/client/spooling/encoding/ZstdQueryDataDecoder.java index 696ae3631920..58d61b86a8e9 100644 --- a/client/trino-client/src/main/java/io/trino/client/spooling/encoding/ZstdQueryDataDecoder.java +++ b/client/trino-client/src/main/java/io/trino/client/spooling/encoding/ZstdQueryDataDecoder.java @@ -13,11 +13,16 @@ */ package io.trino.client.spooling.encoding; -import io.airlift.compress.zstd.ZstdInputStream; +import com.google.common.io.ByteStreams; +import io.airlift.compress.zstd.ZstdDecompressor; import io.trino.client.QueryDataDecoder; +import java.io.ByteArrayInputStream; +import java.io.IOException; import java.io.InputStream; +import static java.lang.String.format; + public class ZstdQueryDataDecoder extends CompressedQueryDataDecoder { @@ -27,9 +32,18 @@ public ZstdQueryDataDecoder(QueryDataDecoder delegate) } @Override - InputStream decompress(InputStream inputStream, int uncompressedSize) + InputStream decompress(InputStream stream, int uncompressedSize) + throws IOException { - return new ZstdInputStream(inputStream); + ZstdDecompressor decompressor = new ZstdDecompressor(); + byte[] bytes = ByteStreams.toByteArray(stream); + byte[] output = new byte[uncompressedSize]; + + int decompressedSize = decompressor.decompress(bytes, 0, bytes.length, output, 0, output.length); + if (decompressedSize != uncompressedSize) { + throw new IOException(format("Decompressed size does not match expected segment size, expected %d, got %d", decompressedSize, uncompressedSize)); + } + return new ByteArrayInputStream(output); } @Override From c18eae18e7a911195804c327c528a89470c7c672 Mon Sep 17 00:00:00 2001 From: "Mateusz \"Serafin\" Gajewski" Date: Wed, 25 Sep 2024 22:47:14 +0200 Subject: [PATCH 3/7] Convert and get TimeZone/DateTimeZone once --- .../io/trino/jdbc/AbstractTrinoResultSet.java | 27 +++++++++---------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/client/trino-jdbc/src/main/java/io/trino/jdbc/AbstractTrinoResultSet.java b/client/trino-jdbc/src/main/java/io/trino/jdbc/AbstractTrinoResultSet.java index ad42b3ddba4a..a502478b16e6 100644 --- a/client/trino-jdbc/src/main/java/io/trino/jdbc/AbstractTrinoResultSet.java +++ b/client/trino-jdbc/src/main/java/io/trino/jdbc/AbstractTrinoResultSet.java @@ -118,6 +118,9 @@ abstract class AbstractTrinoResultSet private static final int MAX_DATETIME_PRECISION = 12; + private static final DateTimeZone CURRENT_TIME_ZONE = DateTimeZone.forID(ZoneId.systemDefault().getId()); + private static final TimeZone CURRENT_JAVA_TIME_ZONE = TimeZone.getTimeZone(ZoneId.of(CURRENT_TIME_ZONE.getID())); + private static final int MILLISECONDS_PER_SECOND = 1000; private static final int MILLISECONDS_PER_MINUTE = 60 * MILLISECONDS_PER_SECOND; private static final long NANOSECONDS_PER_SECOND = 1_000_000_000; @@ -150,8 +153,8 @@ abstract class AbstractTrinoResultSet TypeConversions.builder() .add("decimal", String.class, BigDecimal.class, AbstractTrinoResultSet::parseBigDecimal) .add("varbinary", byte[].class, String.class, value -> "0x" + BaseEncoding.base16().encode(value)) - .add("date", String.class, Date.class, string -> parseDate(string, DateTimeZone.forID(ZoneId.systemDefault().getId()))) - .add("date", String.class, java.time.LocalDate.class, string -> parseDate(string, DateTimeZone.forID(ZoneId.systemDefault().getId())).toLocalDate()) + .add("date", String.class, Date.class, string -> parseDate(string, CURRENT_TIME_ZONE, CURRENT_JAVA_TIME_ZONE)) + .add("date", String.class, java.time.LocalDate.class, string -> parseDate(string, CURRENT_TIME_ZONE, CURRENT_JAVA_TIME_ZONE).toLocalDate()) .add("time", String.class, Time.class, string -> parseTime(string, ZoneId.systemDefault())) .add("time with time zone", String.class, Time.class, AbstractTrinoResultSet::parseTimeWithTimeZone) .add("timestamp", String.class, Timestamp.class, string -> parseTimestampAsSqlTimestamp(string, ZoneId.systemDefault())) @@ -179,8 +182,6 @@ abstract class AbstractTrinoResultSet return result; }) .build(); - - private final DateTimeZone resultTimeZone; protected final Iterator> results; private final Map fieldMap; private final List columnInfoList; @@ -193,8 +194,6 @@ abstract class AbstractTrinoResultSet AbstractTrinoResultSet(Optional statement, List columns, Iterator> results) { this.statement = requireNonNull(statement, "statement is null"); - this.resultTimeZone = DateTimeZone.forID(ZoneId.systemDefault().getId()); - requireNonNull(columns, "columns is null"); this.fieldMap = getFieldMap(columns); this.columnInfoList = getColumnInfo(columns); @@ -333,10 +332,10 @@ public byte[] getBytes(int columnIndex) public Date getDate(int columnIndex) throws SQLException { - return getDate(columnIndex, resultTimeZone); + return getDate(columnIndex, CURRENT_TIME_ZONE, CURRENT_JAVA_TIME_ZONE); } - private Date getDate(int columnIndex, DateTimeZone localTimeZone) + private Date getDate(int columnIndex, DateTimeZone localTimeZone, TimeZone localJavaTimeZone) throws SQLException { Object value = column(columnIndex); @@ -345,14 +344,14 @@ private Date getDate(int columnIndex, DateTimeZone localTimeZone) } try { - return parseDate(String.valueOf(value), localTimeZone); + return parseDate(String.valueOf(value), localTimeZone, localJavaTimeZone); } catch (IllegalArgumentException e) { throw new SQLException("Expected value to be a date but is: " + value, e); } } - private static Date parseDate(String value, DateTimeZone localTimeZone) + private static Date parseDate(String value, DateTimeZone localTimeZone, TimeZone localJavaTimeZone) { LocalDate localDate = DATE_FORMATTER.parseLocalDate(String.valueOf(value)); long millis = localDate.toDateTimeAtStartOfDay(localTimeZone).getMillis(); @@ -369,7 +368,7 @@ private static Date parseDate(String value, DateTimeZone localTimeZone) // are not compatible with java.sql.Date. LocalDate preGregorianDate = DATE_FORMATTER.parseLocalDate(String.valueOf(value)); Calendar calendar = new GregorianCalendar(preGregorianDate.getYear(), preGregorianDate.getMonthOfYear() - 1, preGregorianDate.getDayOfMonth()); - calendar.setTimeZone(TimeZone.getTimeZone(ZoneId.of(localTimeZone.getID()))); + calendar.setTimeZone(localJavaTimeZone); return new Date(calendar.getTimeInMillis()); } @@ -378,7 +377,7 @@ private static Date parseDate(String value, DateTimeZone localTimeZone) public Time getTime(int columnIndex) throws SQLException { - return getTime(columnIndex, resultTimeZone); + return getTime(columnIndex, CURRENT_TIME_ZONE); } private Time getTime(int columnIndex, DateTimeZone localTimeZone) @@ -415,7 +414,7 @@ private Time getTime(int columnIndex, DateTimeZone localTimeZone) public Timestamp getTimestamp(int columnIndex) throws SQLException { - return getTimestamp(columnIndex, resultTimeZone); + return getTimestamp(columnIndex, CURRENT_TIME_ZONE); } private Timestamp getTimestamp(int columnIndex, DateTimeZone localTimeZone) @@ -1351,7 +1350,7 @@ public Array getArray(String columnLabel) public Date getDate(int columnIndex, Calendar cal) throws SQLException { - return getDate(columnIndex, DateTimeZone.forTimeZone(cal.getTimeZone())); + return getDate(columnIndex, DateTimeZone.forTimeZone(cal.getTimeZone()), cal.getTimeZone()); } @Override From 2e2cf9c514bd032cc264f677de5793ed7913dc79 Mon Sep 17 00:00:00 2001 From: "Mateusz \"Serafin\" Gajewski" Date: Wed, 25 Sep 2024 23:42:15 +0200 Subject: [PATCH 4/7] Parse date only once --- .../src/main/java/io/trino/jdbc/AbstractTrinoResultSet.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/client/trino-jdbc/src/main/java/io/trino/jdbc/AbstractTrinoResultSet.java b/client/trino-jdbc/src/main/java/io/trino/jdbc/AbstractTrinoResultSet.java index a502478b16e6..85bef9c9d878 100644 --- a/client/trino-jdbc/src/main/java/io/trino/jdbc/AbstractTrinoResultSet.java +++ b/client/trino-jdbc/src/main/java/io/trino/jdbc/AbstractTrinoResultSet.java @@ -366,8 +366,7 @@ private static Date parseDate(String value, DateTimeZone localTimeZone, TimeZone // expensive GregorianCalendar; note that Joda also has a chronology that works for // older dates, but it uses a slightly different algorithm and yields results that // are not compatible with java.sql.Date. - LocalDate preGregorianDate = DATE_FORMATTER.parseLocalDate(String.valueOf(value)); - Calendar calendar = new GregorianCalendar(preGregorianDate.getYear(), preGregorianDate.getMonthOfYear() - 1, preGregorianDate.getDayOfMonth()); + Calendar calendar = new GregorianCalendar(localDate.getYear(), localDate.getMonthOfYear() - 1, localDate.getDayOfMonth()); calendar.setTimeZone(localJavaTimeZone); return new Date(calendar.getTimeInMillis()); From f5326e2251f94fce5a49d2c8c274128176002c13 Mon Sep 17 00:00:00 2001 From: "Mateusz \"Serafin\" Gajewski" Date: Thu, 26 Sep 2024 07:14:43 +0200 Subject: [PATCH 5/7] Avoid String.valueOf on a String --- .../src/main/java/io/trino/jdbc/AbstractTrinoResultSet.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/trino-jdbc/src/main/java/io/trino/jdbc/AbstractTrinoResultSet.java b/client/trino-jdbc/src/main/java/io/trino/jdbc/AbstractTrinoResultSet.java index 85bef9c9d878..796952756d77 100644 --- a/client/trino-jdbc/src/main/java/io/trino/jdbc/AbstractTrinoResultSet.java +++ b/client/trino-jdbc/src/main/java/io/trino/jdbc/AbstractTrinoResultSet.java @@ -353,7 +353,7 @@ private Date getDate(int columnIndex, DateTimeZone localTimeZone, TimeZone local private static Date parseDate(String value, DateTimeZone localTimeZone, TimeZone localJavaTimeZone) { - LocalDate localDate = DATE_FORMATTER.parseLocalDate(String.valueOf(value)); + LocalDate localDate = DATE_FORMATTER.parseLocalDate(value); long millis = localDate.toDateTimeAtStartOfDay(localTimeZone).getMillis(); if (millis >= START_OF_MODERN_ERA_SECONDS * MILLISECONDS_PER_SECOND) { return new Date(millis); From ffafcde458a0c144a3f364927c750689d7868bd1 Mon Sep 17 00:00:00 2001 From: "Mateusz \"Serafin\" Gajewski" Date: Thu, 26 Sep 2024 11:01:46 +0200 Subject: [PATCH 6/7] Rename method argument to expectedDecompressedSize --- .../spooling/encoding/CompressedQueryDataDecoder.java | 8 ++++---- .../client/spooling/encoding/Lz4QueryDataDecoder.java | 8 ++++---- .../client/spooling/encoding/ZstdQueryDataDecoder.java | 8 ++++---- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/client/trino-client/src/main/java/io/trino/client/spooling/encoding/CompressedQueryDataDecoder.java b/client/trino-client/src/main/java/io/trino/client/spooling/encoding/CompressedQueryDataDecoder.java index dfab5aac8690..62e8f2404319 100644 --- a/client/trino-client/src/main/java/io/trino/client/spooling/encoding/CompressedQueryDataDecoder.java +++ b/client/trino-client/src/main/java/io/trino/client/spooling/encoding/CompressedQueryDataDecoder.java @@ -34,16 +34,16 @@ public CompressedQueryDataDecoder(QueryDataDecoder delegate) this.delegate = requireNonNull(delegate, "delegate is null"); } - abstract InputStream decompress(InputStream inputStream, int uncompressedSize) + abstract InputStream decompress(InputStream inputStream, int expectedDecompressedSize) throws IOException; @Override public Iterable> decode(InputStream stream, DataAttributes metadata) throws IOException { - Optional uncompressedSize = metadata.getOptional(DataAttribute.UNCOMPRESSED_SIZE, Integer.class); - if (uncompressedSize.isPresent()) { - return delegate.decode(decompress(stream, uncompressedSize.get()), metadata); + Optional expectedDecompressedSize = metadata.getOptional(DataAttribute.UNCOMPRESSED_SIZE, Integer.class); + if (expectedDecompressedSize.isPresent()) { + return delegate.decode(decompress(stream, expectedDecompressedSize.get()), metadata); } // Data not compressed - below threshold return delegate.decode(stream, metadata); diff --git a/client/trino-client/src/main/java/io/trino/client/spooling/encoding/Lz4QueryDataDecoder.java b/client/trino-client/src/main/java/io/trino/client/spooling/encoding/Lz4QueryDataDecoder.java index 8e1ebbe54cb5..826f51539a11 100644 --- a/client/trino-client/src/main/java/io/trino/client/spooling/encoding/Lz4QueryDataDecoder.java +++ b/client/trino-client/src/main/java/io/trino/client/spooling/encoding/Lz4QueryDataDecoder.java @@ -32,16 +32,16 @@ public Lz4QueryDataDecoder(QueryDataDecoder delegate) } @Override - InputStream decompress(InputStream stream, int uncompressedSize) + InputStream decompress(InputStream stream, int expectedDecompressedSize) throws IOException { Lz4Decompressor decompressor = new Lz4Decompressor(); byte[] bytes = ByteStreams.toByteArray(stream); - byte[] output = new byte[uncompressedSize]; + byte[] output = new byte[expectedDecompressedSize]; int decompressedSize = decompressor.decompress(bytes, 0, bytes.length, output, 0, output.length); - if (decompressedSize != uncompressedSize) { - throw new IOException(format("Decompressed size does not match expected segment size, expected %d, got %d", decompressedSize, uncompressedSize)); + if (decompressedSize != expectedDecompressedSize) { + throw new IOException(format("Decompressed size does not match expected segment size, expected %d, got %d", decompressedSize, expectedDecompressedSize)); } return new ByteArrayInputStream(output); } diff --git a/client/trino-client/src/main/java/io/trino/client/spooling/encoding/ZstdQueryDataDecoder.java b/client/trino-client/src/main/java/io/trino/client/spooling/encoding/ZstdQueryDataDecoder.java index 58d61b86a8e9..6ac0cec15a0c 100644 --- a/client/trino-client/src/main/java/io/trino/client/spooling/encoding/ZstdQueryDataDecoder.java +++ b/client/trino-client/src/main/java/io/trino/client/spooling/encoding/ZstdQueryDataDecoder.java @@ -32,16 +32,16 @@ public ZstdQueryDataDecoder(QueryDataDecoder delegate) } @Override - InputStream decompress(InputStream stream, int uncompressedSize) + InputStream decompress(InputStream stream, int expectedDecompressedSize) throws IOException { ZstdDecompressor decompressor = new ZstdDecompressor(); byte[] bytes = ByteStreams.toByteArray(stream); - byte[] output = new byte[uncompressedSize]; + byte[] output = new byte[expectedDecompressedSize]; int decompressedSize = decompressor.decompress(bytes, 0, bytes.length, output, 0, output.length); - if (decompressedSize != uncompressedSize) { - throw new IOException(format("Decompressed size does not match expected segment size, expected %d, got %d", decompressedSize, uncompressedSize)); + if (decompressedSize != expectedDecompressedSize) { + throw new IOException(format("Decompressed size does not match expected segment size, expected %d, got %d", decompressedSize, expectedDecompressedSize)); } return new ByteArrayInputStream(output); } From c6905f4b1de3a772c9bb0c327ce99ec8eb101ec3 Mon Sep 17 00:00:00 2001 From: "Mateusz \"Serafin\" Gajewski" Date: Thu, 26 Sep 2024 11:19:32 +0200 Subject: [PATCH 7/7] Raise compression threshold to 8KB --- .../server/protocol/spooling/encoding/Lz4QueryDataEncoder.java | 2 +- .../server/protocol/spooling/encoding/ZstdQueryDataEncoder.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/server/protocol/spooling/encoding/Lz4QueryDataEncoder.java b/core/trino-main/src/main/java/io/trino/server/protocol/spooling/encoding/Lz4QueryDataEncoder.java index 07488eba52e5..287b29c84cc9 100644 --- a/core/trino-main/src/main/java/io/trino/server/protocol/spooling/encoding/Lz4QueryDataEncoder.java +++ b/core/trino-main/src/main/java/io/trino/server/protocol/spooling/encoding/Lz4QueryDataEncoder.java @@ -22,7 +22,7 @@ public class Lz4QueryDataEncoder extends CompressedQueryDataEncoder { - private static final int COMPRESSION_THRESHOLD = 2048; + private static final int COMPRESSION_THRESHOLD = 8192; public Lz4QueryDataEncoder(QueryDataEncoder delegate) { diff --git a/core/trino-main/src/main/java/io/trino/server/protocol/spooling/encoding/ZstdQueryDataEncoder.java b/core/trino-main/src/main/java/io/trino/server/protocol/spooling/encoding/ZstdQueryDataEncoder.java index e916916a07bc..d4f91cc9e2c5 100644 --- a/core/trino-main/src/main/java/io/trino/server/protocol/spooling/encoding/ZstdQueryDataEncoder.java +++ b/core/trino-main/src/main/java/io/trino/server/protocol/spooling/encoding/ZstdQueryDataEncoder.java @@ -22,7 +22,7 @@ public class ZstdQueryDataEncoder extends CompressedQueryDataEncoder { - private static final int COMPRESSION_THRESHOLD = 2048; + private static final int COMPRESSION_THRESHOLD = 8192; public ZstdQueryDataEncoder(QueryDataEncoder delegate) {