Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support writing timestamp with time zone type on partitioned column in Delta #18353

Merged
merged 1 commit into from
Jul 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,46 @@ public void testTimestampPredicatePushdown(String value)
results -> {});
}

@Test
public void testTimestampWithTimeZonePartition()
{
String tableName = "test_timestamp_tz_partition_" + randomNameSuffix();

assertUpdate("DROP TABLE IF EXISTS " + tableName);
assertUpdate("CREATE TABLE " + tableName + "(id INT, part TIMESTAMP WITH TIME ZONE) WITH (partitioned_by = ARRAY['part'])");
assertUpdate(
"INSERT INTO " + tableName + " VALUES " +
"(1, NULL)," +
"(2, TIMESTAMP '0001-01-01 00:00:00.000 UTC')," +
"(3, TIMESTAMP '2023-07-20 01:02:03.9999 -01:00')," +
"(4, TIMESTAMP '9999-12-31 23:59:59.999 UTC')",
4);

assertThat(query("SELECT * FROM " + tableName))
.matches("VALUES " +
"(1, NULL)," +
"(2, TIMESTAMP '0001-01-01 00:00:00.000 UTC')," +
"(3, TIMESTAMP '2023-07-20 02:02:04.000 UTC')," +
"(4, TIMESTAMP '9999-12-31 23:59:59.999 UTC')");
assertQuery(
"SHOW STATS FOR " + tableName,
"VALUES " +
"('id', null, 4.0, 0.0, null, 1, 4)," +
"('part', null, 3.0, 0.25, null, null, null)," +
"(null, null, null, null, 4.0, null, null)");

assertThat((String) computeScalar("SELECT \"$path\" FROM " + tableName + " WHERE id = 1"))
.contains("/part=__HIVE_DEFAULT_PARTITION__/");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is "HIVE_DEFAULT_PARTITION" also the Delta's convention for paths for null part key?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's verified in a product test:

assertThat(onTrino().executeQuery("SELECT \"$path\" FROM delta.default." + tableName + " WHERE a_number IS NULL").column(1))
.hasSize(2)
.allMatch(path -> ((String) path).contains("/a_number=__HIVE_DEFAULT_PARTITION__/"));

assertThat((String) computeScalar("SELECT \"$path\" FROM " + tableName + " WHERE id = 2"))
.contains("/part=0001-01-01 00%3A00%3A00/");
assertThat((String) computeScalar("SELECT \"$path\" FROM " + tableName + " WHERE id = 3"))
.contains("/part=2023-07-20 02%3A02%3A04/");
assertThat((String) computeScalar("SELECT \"$path\" FROM " + tableName + " WHERE id = 4"))
.contains("/part=9999-12-31 23%3A59%3A59.999/");

assertUpdate("DROP TABLE " + tableName);
}

@DataProvider
public Object[][] timestampValues()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import io.trino.spi.type.MapType;
import io.trino.spi.type.RowType;
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.TimestampWithTimeZoneType;
import io.trino.spi.type.Type;
import io.trino.spi.type.VarcharType;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -86,6 +87,7 @@
import java.util.Optional;
import java.util.Properties;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.io.BaseEncoding.base16;
Expand Down Expand Up @@ -113,11 +115,13 @@
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.BooleanType.BOOLEAN;
import static io.trino.spi.type.Chars.padSpaces;
import static io.trino.spi.type.DateTimeEncoding.unpackMillisUtc;
import static io.trino.spi.type.DateType.DATE;
import static io.trino.spi.type.DoubleType.DOUBLE;
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.RealType.REAL;
import static io.trino.spi.type.SmallintType.SMALLINT;
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS;
import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_MILLISECOND;
import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_SECOND;
import static io.trino.spi.type.Timestamps.MILLISECONDS_PER_SECOND;
Expand Down Expand Up @@ -334,6 +338,10 @@ public static Object getField(DateTimeZone localZone, Type type, Block block, in
if (type instanceof TimestampType timestampType) {
return getHiveTimestamp(localZone, timestampType, block, position);
}
if (type instanceof TimestampWithTimeZoneType) {
checkArgument(type.equals(TIMESTAMP_TZ_MILLIS));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good functionally from my perspective.

Code wise, I'm concerned about mixing Delta Lake specific code within HiveWriteUtils.

Do you see anything against having DeltaLakeWriteUtils with a slight handling for timestamp tz and call HiveWriterUtils.getField ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. We call here from io.trino.plugin.deltalake.AbstractDeltaLakePageSink#createPartitionValues.
For example i am not convinced this is the appropriate check for Delta

if (!CharMatcher.inRange((char) 0x20, (char) 0x7E).matchesAllOf(valueString)) {
throw new TrinoException(HIVE_INVALID_PARTITION_VALUE,
"Hive partition keys can only contain printable ASCII characters (0x20 - 0x7E). Invalid value: " +
base16().withSeparator(" ", 2).encode(valueString.getBytes(UTF_8)));
}

in delta context the value gets used to

  • make partition-like path
  • write to addEntry json (right?)

so the logic could indeed be Delta-specific.

AFAIAC, this can go follow-up

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will send a follow-up PR.

return getHiveTimestampTz(block, position);
}
if (type instanceof DecimalType decimalType) {
return getHiveDecimal(decimalType, block, position);
}
Expand Down Expand Up @@ -780,4 +788,10 @@ private static Timestamp getHiveTimestamp(DateTimeZone localZone, TimestampType
int nanosOfSecond = microsOfSecond * NANOSECONDS_PER_MICROSECOND + nanosOfMicro;
return Timestamp.ofEpochSecond(epochSeconds, nanosOfSecond);
}

private static Timestamp getHiveTimestampTz(Block block, int position)
{
long epochMillis = unpackMillisUtc(block.getLong(position, 0));
return Timestamp.ofEpochMilli(epochMillis);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,50 @@ public void testPartitionedInsertCompatibility()
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS})
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
public void testTimestampWithTimeZonePartitionedInsertCompatibility()
{
String tableName = "test_dl_timestamp_tz_partitioned_insert_" + randomNameSuffix();

onTrino().executeQuery("" +
"CREATE TABLE delta.default." + tableName +
"(id INT, part TIMESTAMP WITH TIME ZONE)" +
"WITH (partitioned_by = ARRAY['part'], location = 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "')");
try {
onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES" +
"(1, TIMESTAMP '0001-01-01 00:00:00.000 UTC')," +
"(2, TIMESTAMP '2023-01-02 01:02:03.999 +01:00')");
onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES" +
"(3, TIMESTAMP '2023-03-04 01:02:03.999 -01:00')," +
"(4, TIMESTAMP '9999-12-31 23:59:59.999 UTC')");

List<Row> expectedRows = ImmutableList.<Row>builder()
.add(row(1, "0001-01-01 00:00:00.000"))
.add(row(2, "2023-01-02 00:02:03.999"))
.add(row(3, "2023-03-04 02:02:03.999"))
.add(row(4, "9999-12-31 23:59:59.999"))
.build();

assertThat(onDelta().executeQuery("SELECT id, date_format(part, \"yyyy-MM-dd HH:mm:ss.SSS\") FROM default." + tableName))
.containsOnly(expectedRows);
assertThat(onTrino().executeQuery("SELECT id, format_datetime(part, 'yyyy-MM-dd HH:mm:ss.SSS') FROM delta.default." + tableName))
.containsOnly(expectedRows);

assertThat((String) onTrino().executeQuery("SELECT \"$path\" FROM delta.default." + tableName + " WHERE id = 1").getOnlyValue())
.contains("/part=0001-01-01 00%3A00%3A00/");
assertThat((String) onTrino().executeQuery("SELECT \"$path\" FROM delta.default." + tableName + " WHERE id = 2").getOnlyValue())
.contains("/part=2023-01-02 00%3A02%3A03.999/");
assertThat((String) onTrino().executeQuery("SELECT \"$path\" FROM delta.default." + tableName + " WHERE id = 3").getOnlyValue())
.contains("/part=2023-03-04 02%3A02%3A03.999/");
assertThat((String) onTrino().executeQuery("SELECT \"$path\" FROM delta.default." + tableName + " WHERE id = 4").getOnlyValue())
.contains("/part=9999-12-31 23%3A59%3A59.999/");
}
finally {
onTrino().executeQuery("DROP TABLE delta.default." + tableName);
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS})
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
public void testTrinoPartitionedDifferentOrderInsertCompatibility()
Expand Down