From f1122d3e8bbd6de5c5d1b9b804f8780c17636022 Mon Sep 17 00:00:00 2001 From: praveenkrishna Date: Thu, 28 Jul 2022 12:09:52 +0530 Subject: [PATCH] !fixup Implement timestamp predicate pushdown in Druid connector Co-authored-by: Liu Yang --- .../trino/plugin/druid/DruidJdbcClient.java | 4 +- .../plugin/druid/BaseDruidConnectorTest.java | 131 +++++++++++++++--- 2 files changed, 112 insertions(+), 23 deletions(-) diff --git a/plugin/trino-druid/src/main/java/io/trino/plugin/druid/DruidJdbcClient.java b/plugin/trino-druid/src/main/java/io/trino/plugin/druid/DruidJdbcClient.java index eeba7b62828..a1c208e6a30 100644 --- a/plugin/trino-druid/src/main/java/io/trino/plugin/druid/DruidJdbcClient.java +++ b/plugin/trino-druid/src/main/java/io/trino/plugin/druid/DruidJdbcClient.java @@ -134,6 +134,8 @@ public class DruidJdbcClient private static final String DRUID_CATALOG = "druid"; // All the datasources in Druid are created under schema "druid" private static final String DRUID_SCHEMA = "druid"; + + // TODO We could also re-evaluate this logic by using a new Calendar for each row if necessary private static final Calendar UTC_CALENDAR = Calendar.getInstance(TimeZone.getTimeZone(UTC)); private static final DateTimeFormatter LOCAL_DATE_TIME = new DateTimeFormatterBuilder() @@ -309,7 +311,7 @@ public static ColumnMapping timestampColumnMappingUsingSqlTimestampWithFullPushd return ColumnMapping.longMapping( timestampType, (resultSet, columnIndex) -> { - // Druis's ResultSet depends on JDBC Connection TimeZone, so we pass the Calendar to get the result at UTC. + // Druid's ResultSet depends on JDBC Connection TimeZone, so we pass the Calendar to get the result at UTC. Instant instant = Instant.ofEpochMilli(resultSet.getTimestamp(columnIndex, UTC_CALENDAR).getTime()); LocalDateTime timestamp = LocalDateTime.ofInstant(instant, UTC); return toTrinoTimestamp(timestampType, timestamp); diff --git a/plugin/trino-druid/src/test/java/io/trino/plugin/druid/BaseDruidConnectorTest.java b/plugin/trino-druid/src/test/java/io/trino/plugin/druid/BaseDruidConnectorTest.java index 5f9349fffc2..a17c967ccb7 100644 --- a/plugin/trino-druid/src/test/java/io/trino/plugin/druid/BaseDruidConnectorTest.java +++ b/plugin/trino-druid/src/test/java/io/trino/plugin/druid/BaseDruidConnectorTest.java @@ -31,6 +31,7 @@ import org.intellij.lang.annotations.Language; import org.testng.SkipException; import org.testng.annotations.AfterClass; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import static io.trino.plugin.druid.DruidQueryRunner.copyAndIngestTpchData; @@ -38,7 +39,10 @@ import static io.trino.spi.type.VarcharType.VARCHAR; import static io.trino.sql.planner.assertions.PlanMatchPattern.anyTree; import static io.trino.sql.planner.assertions.PlanMatchPattern.node; +import static io.trino.sql.planner.assertions.PlanMatchPattern.output; +import static io.trino.sql.planner.assertions.PlanMatchPattern.values; import static io.trino.testing.MaterializedResult.resultBuilder; +import static java.lang.String.format; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.testng.Assert.assertFalse; @@ -396,61 +400,87 @@ public void testPredicatePushdown() } @Test - public void testPredicatePushdownForTimestamp() + public void testPredicatePushdownForTimestampWithSecondsPrecision() { // timestamp equality assertThat(query("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time = TIMESTAMP '1992-01-04 00:00:00'")) .matches("VALUES (BIGINT '3', BIGINT '1673', CAST('RAIL' AS varchar))") .isFullyPushedDown(); - assertThat(query("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time = TIMESTAMP '1992-01-04 00:00:00.001'")) - .returnsEmptyResult() - .isNotFullyPushedDown(FilterNode.class); - // timestamp comparison assertThat(query("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time < TIMESTAMP '1992-01-05'")) .matches("VALUES (BIGINT '3', BIGINT '1673', CAST('RAIL' AS varchar))") .isFullyPushedDown(); - assertThat(query("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time < TIMESTAMP '1992-01-05 00:00:00.001'")) - .matches("VALUES (BIGINT '3', BIGINT '1673', CAST('RAIL' AS varchar))") - .isNotFullyPushedDown(FilterNode.class); - assertThat(query("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time <= TIMESTAMP '1992-01-04'")) .matches("VALUES (BIGINT '3', BIGINT '1673', CAST('RAIL' AS varchar))") .isFullyPushedDown(); - assertThat(query("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time <= TIMESTAMP '1992-01-04 00:00:00.001'")) - .matches("VALUES (BIGINT '3', BIGINT '1673', CAST('RAIL' AS varchar))") - .isNotFullyPushedDown(FilterNode.class); - assertThat(query("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time > TIMESTAMP '1998-11-28'")) .matches("VALUES " + "(BIGINT '2', BIGINT '370', CAST('RAIL' AS varchar)), " + "(BIGINT '2', BIGINT '468', CAST('AIR' AS varchar))") .isFullyPushedDown(); - assertThat(query("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time > TIMESTAMP '1998-11-28 00:00:00.001'")) + assertThat(query("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time >= TIMESTAMP '1998-11-29 00:00:00'")) .matches("VALUES " + "(BIGINT '2', BIGINT '370', CAST('RAIL' AS varchar)), " + "(BIGINT '2', BIGINT '468', CAST('AIR' AS varchar))") + .isFullyPushedDown(); + + // timestamp range + assertThat(query("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time BETWEEN TIMESTAMP '1992-01-01' AND TIMESTAMP '1992-01-05'")) + .matches("VALUES (BIGINT '3', BIGINT '1673', CAST('RAIL' AS varchar))") + .isFullyPushedDown(); + + // varchar IN without domain compaction + assertThat(query("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time IN (TIMESTAMP '1992-01-04', TIMESTAMP '1998-11-27 00:00:00.000', TIMESTAMP '1998-11-28')")) + .matches("VALUES " + + "(BIGINT '3', BIGINT '1673', CAST('RAIL' AS varchar)), " + + "(BIGINT '1', BIGINT '574', CAST('AIR' AS varchar))") + .isFullyPushedDown(); + + // varchar IN with small compaction threshold + assertThat(query( + Session.builder(getSession()) + .setCatalogSessionProperty("druid", "domain_compaction_threshold", "1") + .build(), + "SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time IN (TIMESTAMP '1992-01-04', TIMESTAMP '1998-11-27 00:00:00', TIMESTAMP '1998-11-28')")) + .matches("VALUES " + + "(BIGINT '3', BIGINT '1673', CAST('RAIL' AS varchar)), " + + "(BIGINT '1', BIGINT '574', CAST('AIR' AS varchar))") + // Filter node is retained as no constraint is pushed into connector. .isNotFullyPushedDown(FilterNode.class); + } - assertThat(query("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time >= TIMESTAMP '1998-11-29 00:00:00'")) + @Test + public void testPredicatePushdownForTimestampWithMillisPrecision() + { + // timestamp equality + assertThat(query("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time = TIMESTAMP '1992-01-04 00:00:00.001'")) + .returnsEmptyResult() + .isNotFullyPushedDown(FilterNode.class); + + // timestamp comparison + assertThat(query("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time < TIMESTAMP '1992-01-05 00:00:00.001'")) + .matches("VALUES (BIGINT '3', BIGINT '1673', CAST('RAIL' AS varchar))") + .isNotFullyPushedDown(FilterNode.class); + + assertThat(query("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time <= TIMESTAMP '1992-01-04 00:00:00.001'")) + .matches("VALUES (BIGINT '3', BIGINT '1673', CAST('RAIL' AS varchar))") + .isNotFullyPushedDown(FilterNode.class); + + assertThat(query("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time > TIMESTAMP '1998-11-28 00:00:00.001'")) .matches("VALUES " + "(BIGINT '2', BIGINT '370', CAST('RAIL' AS varchar)), " + "(BIGINT '2', BIGINT '468', CAST('AIR' AS varchar))") - .isFullyPushedDown(); + .isNotFullyPushedDown(FilterNode.class); assertThat(query("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time >= TIMESTAMP '1998-11-29 00:00:00.001'")) .returnsEmptyResult() .isNotFullyPushedDown(FilterNode.class); // timestamp range - assertThat(query("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time BETWEEN TIMESTAMP '1992-01-01' AND TIMESTAMP '1992-01-05'")) - .matches("VALUES (BIGINT '3', BIGINT '1673', CAST('RAIL' AS varchar))") - .isFullyPushedDown(); - assertThat(query("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time BETWEEN TIMESTAMP '1992-01-01 00:00:00.001' AND TIMESTAMP '1992-01-05'")) .matches("VALUES (BIGINT '3', BIGINT '1673', CAST('RAIL' AS varchar))") .isNotFullyPushedDown(FilterNode.class); @@ -459,7 +489,7 @@ public void testPredicatePushdownForTimestamp() .matches("VALUES (BIGINT '3', BIGINT '1673', CAST('RAIL' AS varchar))") .isNotFullyPushedDown(FilterNode.class); - // varchar IN without domain compaction + // timestamp IN without domain compaction assertThat(query("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time IN (TIMESTAMP '1992-01-04', TIMESTAMP '1998-11-27 00:00:00.000', TIMESTAMP '1998-11-28')")) .matches("VALUES " + "(BIGINT '3', BIGINT '1673', CAST('RAIL' AS varchar)), " + @@ -472,7 +502,7 @@ public void testPredicatePushdownForTimestamp() "(BIGINT '1', BIGINT '574', CAST('AIR' AS varchar))") .isNotFullyPushedDown(FilterNode.class); - // varchar IN with small compaction threshold + // timestamp IN with small compaction threshold assertThat(query( Session.builder(getSession()) .setCatalogSessionProperty("druid", "domain_compaction_threshold", "1") @@ -484,4 +514,61 @@ public void testPredicatePushdownForTimestamp() // Filter node is retained as no constraint is pushed into connector. .isNotFullyPushedDown(FilterNode.class); } + + @Test(dataProvider = "timestampValuesProvider") + public void testPredicatePushdownForTimestampWithHigherPrecision(String timestamp) + { + // timestamp equality + assertThat(query(format("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time = TIMESTAMP '%s'", timestamp))) + .returnsEmptyResult() + .matches(output( + values("linenumber", "partkey", "shipmode"))); + + // timestamp comparison + assertThat(query(format("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time < TIMESTAMP '%s'", timestamp))) + .matches("VALUES (BIGINT '3', BIGINT '1673', CAST('RAIL' AS varchar))") + .isNotFullyPushedDown(FilterNode.class); + + assertThat(query(format("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time <= TIMESTAMP '%s'", timestamp))) + .matches("VALUES (BIGINT '3', BIGINT '1673', CAST('RAIL' AS varchar))") + .isNotFullyPushedDown(FilterNode.class); + + assertThat(query(format("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time > (TIMESTAMP '%s' + INTERVAL '2520' DAY)", timestamp))) + .matches("VALUES " + + "(BIGINT '2', BIGINT '370', CAST('RAIL' AS varchar)), " + + "(BIGINT '2', BIGINT '468', CAST('AIR' AS varchar))") + .isNotFullyPushedDown(FilterNode.class); + + assertThat(query(format("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time >= (TIMESTAMP '%s' + INTERVAL '2521' DAY)", timestamp))) + .returnsEmptyResult() + .isNotFullyPushedDown(FilterNode.class); + + // timestamp range + assertThat(query(format("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time BETWEEN TIMESTAMP '1992-01-04' AND TIMESTAMP '%s'", timestamp))) + .matches("VALUES (BIGINT '3', BIGINT '1673', CAST('RAIL' AS varchar))") + .isNotFullyPushedDown(FilterNode.class); + + // varchar IN without domain compaction + assertThat(query(format("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time IN (TIMESTAMP '1992-01-04', TIMESTAMP '1998-11-27', TIMESTAMP '%s')", timestamp))) + .matches("VALUES " + + "(BIGINT '3', BIGINT '1673', CAST('RAIL' AS varchar)), " + + "(BIGINT '1', BIGINT '574', CAST('AIR' AS varchar))") + .isNotFullyPushedDown(FilterNode.class); + } + + @DataProvider + public Object[][] timestampValuesProvider() + { + return new Object[][] { + {"1992-01-04 00:00:00.1234"}, + {"1992-01-04 00:00:00.12345"}, + {"1992-01-04 00:00:00.123456"}, + {"1992-01-04 00:00:00.1234567"}, + {"1992-01-04 00:00:00.12345678"}, + {"1992-01-04 00:00:00.123456789"}, + {"1992-01-04 00:00:00.1234567891"}, + {"1992-01-04 00:00:00.12345678912"}, + {"1992-01-04 00:00:00.123456789123"} + }; + } }