Skip to content

Commit

Permalink
Respect timestamp coercion when coercing complex column
Browse files Browse the repository at this point in the history
Previously MILLISECONDS precision was leaked when corecion applied on complex column even if coercion was not applied on timestamp column.
  • Loading branch information
Praveen2112 committed Jun 28, 2023
1 parent e1a16eb commit 97153aa
Show file tree
Hide file tree
Showing 4 changed files with 281 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,10 @@ public static Optional<ConnectorPageSource> createHivePageSource(
Optional<BucketAdaptation> bucketAdaptation = createBucketAdaptation(bucketConversion, tableBucketNumber, regularAndInterimColumnMappings);
Optional<BucketValidator> bucketValidator = createBucketValidator(path, bucketValidation, tableBucketNumber, regularAndInterimColumnMappings);

HiveTimestampPrecision timestampPrecision = getTimestampPrecision(session);

for (HivePageSourceFactory pageSourceFactory : pageSourceFactories) {
List<HiveColumnHandle> desiredColumns = toColumnHandles(regularAndInterimColumnMappings, true, typeManager);
List<HiveColumnHandle> desiredColumns = toColumnHandles(regularAndInterimColumnMappings, true, typeManager, timestampPrecision);

Optional<ReaderPageSource> readerWithProjections = pageSourceFactory.createPageSource(
configuration,
Expand Down Expand Up @@ -245,13 +247,13 @@ public static Optional<ConnectorPageSource> createHivePageSource(
bucketValidator,
adapter,
typeManager,
getTimestampPrecision(session),
timestampPrecision,
pageSource));
}
}

for (HiveRecordCursorProvider provider : cursorProviders) {
List<HiveColumnHandle> desiredColumns = toColumnHandles(regularAndInterimColumnMappings, false, typeManager);
List<HiveColumnHandle> desiredColumns = toColumnHandles(regularAndInterimColumnMappings, false, typeManager, timestampPrecision);
Optional<ReaderRecordCursorWithProjections> readerWithProjections = provider.createRecordCursor(
configuration,
session,
Expand Down Expand Up @@ -546,7 +548,7 @@ public static List<ColumnMapping> extractRegularAndInterimColumnMappings(List<Co
.collect(toImmutableList());
}

public static List<HiveColumnHandle> toColumnHandles(List<ColumnMapping> regularColumnMappings, boolean doCoercion, TypeManager typeManager)
public static List<HiveColumnHandle> toColumnHandles(List<ColumnMapping> regularColumnMappings, boolean doCoercion, TypeManager typeManager, HiveTimestampPrecision timestampPrecision)
{
return regularColumnMappings.stream()
.map(columnMapping -> {
Expand All @@ -562,14 +564,14 @@ public static List<HiveColumnHandle> toColumnHandles(List<ColumnMapping> regular
projectedColumn.getDereferenceIndices(),
projectedColumn.getDereferenceNames(),
fromHiveType,
fromHiveType.getType(typeManager));
fromHiveType.getType(typeManager, timestampPrecision));
});

return new HiveColumnHandle(
columnHandle.getBaseColumnName(),
columnHandle.getBaseHiveColumnIndex(),
fromHiveTypeBase,
fromHiveTypeBase.getType(typeManager),
fromHiveTypeBase.getType(typeManager, timestampPrecision),
newColumnProjectionInfo,
columnHandle.getColumnType(),
columnHandle.getComment());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import io.trino.jdbc.TrinoArray;
import io.trino.plugin.hive.HiveTimestampPrecision;
import io.trino.tempto.assertions.QueryAssert.Row;
import io.trino.tempto.fulfillment.table.MutableTablesState;
import io.trino.tempto.fulfillment.table.TableDefinition;
Expand All @@ -28,10 +30,13 @@

import java.math.BigDecimal;
import java.sql.JDBCType;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
Expand All @@ -41,9 +46,11 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.airlift.testing.Assertions.assertEqualsIgnoreOrder;
import static io.trino.plugin.hive.HiveTimestampPrecision.NANOSECONDS;
import static io.trino.tempto.assertions.QueryAssert.Row.row;
import static io.trino.tempto.context.ThreadLocalTestContextHolder.testContext;
import static io.trino.tempto.fulfillment.table.TableHandle.tableHandle;
import static io.trino.tests.product.utils.JdbcDriverUtils.setSessionProperty;
import static io.trino.tests.product.utils.QueryExecutors.onHive;
import static io.trino.tests.product.utils.QueryExecutors.onTrino;
import static java.lang.String.format;
Expand All @@ -59,6 +66,7 @@
import static java.sql.JDBCType.SMALLINT;
import static java.sql.JDBCType.STRUCT;
import static java.sql.JDBCType.VARCHAR;
import static java.util.Collections.nCopies;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;
Expand Down Expand Up @@ -364,6 +372,132 @@ else if (getHiveVersionMajor() == 3 && isFormat.test("orc")) {
.buildOrThrow();
}

protected void doTestHiveCoercionWithDifferentTimestampPrecision(HiveTableDefinition tableDefinition)
{
String tableName = mutableTableInstanceOf(tableDefinition).getNameInDatabase();

// Insert all the data with nanoseconds precision
setHiveTimestampPrecision(NANOSECONDS);
onTrino().executeQuery(
"""
INSERT INTO %s
SELECT
(CAST(ROW (timestamp_value, -1) AS ROW(keep TIMESTAMP(9), si2i SMALLINT))),
ARRAY [CAST(ROW (timestamp_value, -1) AS ROW (keep TIMESTAMP(9), si2i SMALLINT))],
MAP (ARRAY [2], ARRAY [CAST(ROW (timestamp_value, -1) AS ROW (keep TIMESTAMP(9), si2i SMALLINT))]),
1
FROM (VALUES
(TIMESTAMP '2121-07-15 15:30:12.123499'),
(TIMESTAMP '2121-07-15 15:30:12.123500'),
(TIMESTAMP '2121-07-15 15:30:12.123501'),
(TIMESTAMP '2121-07-15 15:30:12.123499999'),
(TIMESTAMP '2121-07-15 15:30:12.123500000'),
(TIMESTAMP '2121-07-15 15:30:12.123500001')) AS t (timestamp_value)
""".formatted(tableName));

onHive().executeQuery(format("ALTER TABLE %s CHANGE COLUMN timestamp_row_to_row timestamp_row_to_row struct<keep:timestamp, si2i:int>", tableName));
onHive().executeQuery(format("ALTER TABLE %s CHANGE COLUMN timestamp_list_to_list timestamp_list_to_list array<struct<keep:timestamp, si2i:int>>", tableName));
onHive().executeQuery(format("ALTER TABLE %s CHANGE COLUMN timestamp_map_to_map timestamp_map_to_map map<int,struct<keep:timestamp, si2i:int>>", tableName));

for (HiveTimestampPrecision hiveTimestampPrecision : HiveTimestampPrecision.values()) {
setHiveTimestampPrecision(hiveTimestampPrecision);
assertThat(onTrino().executeQuery("SHOW COLUMNS FROM " + tableName).project(1, 2)).containsExactlyInOrder(
row("timestamp_row_to_row", "row(keep timestamp(%d), si2i integer)".formatted(hiveTimestampPrecision.getPrecision())),
row("timestamp_list_to_list", "array(row(keep timestamp(%d), si2i integer))".formatted(hiveTimestampPrecision.getPrecision())),
row("timestamp_map_to_map", "map(integer, row(keep timestamp(%d), si2i integer))".formatted(hiveTimestampPrecision.getPrecision())),
row("id", "bigint"));

List<String> allColumns = ImmutableList.of(
"timestamp_row_to_row",
"timestamp_list_to_list",
"timestamp_map_to_map",
"id");

// For Trino, remove unsupported columns
List<String> trinoReadColumns = removeUnsupportedColumnsForTrino(allColumns, tableName);
Map<String, List<Object>> expectedTinoResults = Maps.filterKeys(
expectedRowsForEngineProvider(Engine.TRINO, hiveTimestampPrecision),
trinoReadColumns::contains);

String trinoReadQuery = format("SELECT %s FROM %s", String.join(", ", trinoReadColumns), tableName);
assertQueryResults(Engine.TRINO, trinoReadQuery, expectedTinoResults, trinoReadColumns, 6, tableName);

List<String> hiveReadColumns = removeUnsupportedColumnsForHive(allColumns, tableName);
Map<String, List<Object>> expectedHiveResults = Maps.filterKeys(
expectedRowsForEngineProvider(Engine.HIVE, hiveTimestampPrecision),
hiveReadColumns::contains);

String hiveSelectQuery = format("SELECT %s FROM %s", String.join(", ", hiveReadColumns), tableName);
assertQueryResults(Engine.HIVE, hiveSelectQuery, expectedHiveResults, hiveReadColumns, 6, tableName);
}
}

protected Map<String, List<Object>> expectedRowsForEngineProvider(Engine engine, HiveTimestampPrecision timestampPrecision)
{
if (engine == Engine.HIVE) {
List<Object> baseData = ImmutableList.of(
"{\"keep\":\"2121-07-15 15:30:12.123499\",\"si2i\":-1}",
"{\"keep\":\"2121-07-15 15:30:12.1235\",\"si2i\":-1}",
"{\"keep\":\"2121-07-15 15:30:12.123501\",\"si2i\":-1}",
"{\"keep\":\"2121-07-15 15:30:12.123499999\",\"si2i\":-1}",
"{\"keep\":\"2121-07-15 15:30:12.1235\",\"si2i\":-1}",
"{\"keep\":\"2121-07-15 15:30:12.123500001\",\"si2i\":-1}");
return ImmutableMap.<String, List<Object>>builder()
.put("timestamp_row_to_row", baseData)
.put("timestamp_list_to_list", baseData.stream()
.map(ImmutableList::of)
.map(Objects::toString)
.collect(toImmutableList()))
.put("timestamp_map_to_map", baseData.stream()
.map("{2:%s}"::formatted)
.collect(toImmutableList()))
.put("id", nCopies(6, 1))
.buildOrThrow();
}

List<Timestamp> timestampValue = switch (timestampPrecision) {
case MILLISECONDS -> ImmutableList.of(
Timestamp.valueOf("2121-07-15 15:30:12.123"),
Timestamp.valueOf("2121-07-15 15:30:12.124"),
Timestamp.valueOf("2121-07-15 15:30:12.124"),
Timestamp.valueOf("2121-07-15 15:30:12.123"),
Timestamp.valueOf("2121-07-15 15:30:12.124"),
Timestamp.valueOf("2121-07-15 15:30:12.124"));
case MICROSECONDS -> ImmutableList.of(
Timestamp.valueOf("2121-07-15 15:30:12.123499"),
Timestamp.valueOf("2121-07-15 15:30:12.1235"),
Timestamp.valueOf("2121-07-15 15:30:12.123501"),
Timestamp.valueOf("2121-07-15 15:30:12.1235"),
Timestamp.valueOf("2121-07-15 15:30:12.1235"),
Timestamp.valueOf("2121-07-15 15:30:12.1235"));
case NANOSECONDS -> ImmutableList.of(
Timestamp.valueOf("2121-07-15 15:30:12.123499"),
Timestamp.valueOf("2121-07-15 15:30:12.1235"),
Timestamp.valueOf("2121-07-15 15:30:12.123501"),
Timestamp.valueOf("2121-07-15 15:30:12.123499999"),
Timestamp.valueOf("2121-07-15 15:30:12.1235"),
Timestamp.valueOf("2121-07-15 15:30:12.123500001"));
};

List<Object> baseData = timestampValue.stream()
.map(timestamp -> rowBuilder()
.addField("keep", timestamp)
.addField("si2i", -1)
.build())
.collect(toImmutableList());

return ImmutableMap.<String, List<Object>>builder()
.put("timestamp_row_to_row", baseData)
.put("timestamp_list_to_list", baseData.stream()
.map(ImmutableList::of)
.collect(toImmutableList()))
.put("timestamp_map_to_map", baseData.stream()
.map(entry -> ImmutableMap.of(2, entry))
.collect(toImmutableList()))
.put("id", nCopies(6, 1))
.buildOrThrow();
}

protected List<String> removeUnsupportedColumnsForHive(List<String> columns, String tableName)
{
// TODO: assert exceptions being thrown for each column
Expand Down Expand Up @@ -460,10 +594,16 @@ protected Map<ColumnContext, String> expectedExceptionsWithHiveContext()
.put(columnContext("1.2", "orc", "map_to_map"), "Unknown encoding kind: DIRECT_V2")
// Parquet
.put(columnContext("1.2", "parquet", "list_to_list"), "java.lang.UnsupportedOperationException: Cannot inspect java.util.ArrayList")
.put(columnContext("1.2", "parquet", "timestamp_row_to_row"), "Timestamp value coerced to a different value due to zone difference in HiveServer")
.put(columnContext("1.2", "parquet", "timestamp_list_to_list"), "java.lang.UnsupportedOperationException: Cannot inspect java.util.ArrayList")
.put(columnContext("1.2", "parquet", "timestamp_map_to_map"), "java.lang.UnsupportedOperationException: Cannot inspect java.util.ArrayList")
// Rcbinary
.put(columnContext("1.2", "rcbinary", "row_to_row"), "java.util.ArrayList cannot be cast to org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryStruct")
.put(columnContext("1.2", "rcbinary", "list_to_list"), "java.util.ArrayList cannot be cast to org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryArray")
.put(columnContext("1.2", "rcbinary", "map_to_map"), "java.util.HashMap cannot be cast to org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryMap")
.put(columnContext("1.2", "rcbinary", "timestamp_row_to_row"), "java.util.ArrayList cannot be cast to org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryStruct")
.put(columnContext("1.2", "rcbinary", "timestamp_list_to_list"), "java.util.ArrayList cannot be cast to org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryArray")
.put(columnContext("1.2", "rcbinary", "timestamp_map_to_map"), "java.util.HashMap cannot be cast to org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryMap")
//
// 2.1
// Parquet
Expand All @@ -490,6 +630,9 @@ protected Map<ColumnContext, String> expectedExceptionsWithHiveContext()
.put(columnContext("3.1", "rcbinary", "row_to_row"), "java.util.ArrayList cannot be cast to org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryStruct")
.put(columnContext("3.1", "rcbinary", "list_to_list"), "java.util.ArrayList cannot be cast to org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryArray")
.put(columnContext("3.1", "rcbinary", "map_to_map"), "java.util.LinkedHashMap cannot be cast to org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryMap")
.put(columnContext("3.1", "rcbinary", "timestamp_row_to_row"), "java.util.ArrayList cannot be cast to org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryStruct")
.put(columnContext("3.1", "rcbinary", "timestamp_list_to_list"), "java.util.ArrayList cannot be cast to org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryArray")
.put(columnContext("3.1", "rcbinary", "timestamp_map_to_map"), "java.util.LinkedHashMap cannot be cast to org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryMap")
.buildOrThrow();
}

Expand Down Expand Up @@ -527,15 +670,15 @@ private void assertQueryResults(
for (int sqlIndex = 1; sqlIndex <= columns.size(); sqlIndex++) {
String column = columns.get(sqlIndex - 1);

if (column.equals("row_to_row") || column.equals("map_to_map")) {
if (column.contains("row_to_row") || column.contains("map_to_map")) {
assertEqualsIgnoreOrder(
actual.column(sqlIndex),
column(expectedRows, sqlIndex),
format("%s field is not equal", column));
continue;
}

if (column.equals("list_to_list")) {
if (column.contains("list_to_list")) {
assertEqualsIgnoreOrder(
engine == Engine.TRINO ? extract(actual.column(sqlIndex)) : actual.column(sqlIndex),
column(expectedRows, sqlIndex),
Expand Down Expand Up @@ -636,6 +779,10 @@ private void assertColumnTypes(
.put("timestamp_to_string", VARCHAR)
.put("timestamp_to_bounded_varchar", VARCHAR)
.put("timestamp_to_smaller_varchar", VARCHAR)
.put("timestamp_to_varchar", VARCHAR)
.put("timestamp_row_to_row", engine == Engine.TRINO ? JAVA_OBJECT : STRUCT) // row
.put("timestamp_list_to_list", ARRAY) // list
.put("timestamp_map_to_map", JAVA_OBJECT) // map
.buildOrThrow();

assertThat(queryResult)
Expand Down Expand Up @@ -763,4 +910,14 @@ private static QueryResult execute(Engine engine, String sql, QueryExecutor.Quer
{
return engine.queryExecutor().executeQuery(sql, params);
}

private static void setHiveTimestampPrecision(HiveTimestampPrecision hiveTimestampPrecision)
{
try {
setSessionProperty(onTrino().getConnection(), "hive.timestamp_precision", hiveTimestampPrecision.name());
}
catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
Loading

0 comments on commit 97153aa

Please sign in to comment.