-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support changing column types in Hive connector #15938
Conversation
bddf5dc
to
42db233
Compare
if (block.isNull(i)) { | ||
isNull[i] = true; | ||
} | ||
else { | ||
values[i] = block.getByte(i, 0); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suspect this approach is going to be slower because of the additional null check. You can try adding a JMH benchmark like BenchmarkOrcDecimalReader
and compare the performance of these approaches.
We're also not preserving the mayHaveNull
of the original block, that should definitely be done.
We did these type of adaptations in column readers rather than the page source in the parquet reader as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But having all the adaptations in column reader won't it be a bit messy or can we have an adaptations which could be reused by parquet and orc file format.
And some cases of coercion is supported by hive but it could be restricted by iceberg or delta (not sure) - how could we handle them in these case ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The primitive types of orc and parquet are different so the required adaptations won't be the same (e.g. there is no byte/short in parquet only int32/int64).
We could create a function in ByteColumnReader to avoid repeating code like below
if (type == TINYINT) {
return new ByteArrayBlock(nextBatchSize, Optional.empty(), values);
}
if (type == INTEGER) {
return new IntArrayBlock(nextBatchSize, Optional.empty(), convertToIntArray(values));
}
throw new VerifyError("Unsupported type " + type);
These types of column adaptations which are done in the reader are simple ones meant to resolve minor differences in file format type and trino type (e.g. differing byte count of numbers, difference in precision of decimals, timestamps etc.). These should make sense for any connector. For anything complex like reading a number in the file format as a varchar trino type, which a connector may or may not want to do, can be done in the connector.
@@ -65,9 +64,6 @@ public static ColumnReader createColumnReader( | |||
case BOOLEAN: | |||
return new BooleanColumnReader(type, column, memoryContext.newLocalMemoryContext(ColumnReaders.class.getSimpleName())); | |||
case BYTE: | |||
if (type == INTEGER && !column.getAttributes().containsKey("iceberg.id")) { | |||
throw invalidStreamType(column, type); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't require iceberg.id attribute in ORC reader
If I am understanding the change correctly, it could be titled "Fix reading ORC files after column evolved from tinyint to integer"
Am i reading this right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for my misleading commit tile. Updated it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for changing the commit title.
Is TestHiveTransactionalTable.java the only test coverage we have for this "Fix reading ORC files after column evolved from tinyint to integer" change?
@@ -24,7 +24,6 @@ | |||
import io.trino.orc.stream.InputStreamSources; | |||
import io.trino.spi.block.Block; | |||
import io.trino.spi.block.ByteArrayBlock; | |||
import io.trino.spi.block.IntArrayBlock; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Convert ORC block within ColumnAdaptation
The commit message bears no rational. in fact, what's the benefit of the change?
@@ -77,7 +79,7 @@ public ByteColumnReader(Type type, OrcColumn column, LocalMemoryContext memoryCo | |||
throws OrcCorruptionException | |||
{ | |||
this.type = requireNonNull(type, "type is null"); | |||
verifyStreamType(column, type, t -> t == TINYINT || t == INTEGER); | |||
verifyStreamType(column, type, t -> t == TINYINT || t == SMALLINT || t == INTEGER || t == BIGINT); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i though the purpose of "Convert ORC block within ColumnAdaptation" was to have readers simplified, so surprised to see this change here.
I think I like the non-composed (without column adaptations) approach better though, so no changes requested here.
boolean[] isNull = new boolean[block.getPositionCount()]; | ||
short[] values = new short[block.getPositionCount()]; | ||
for (int i = 0; i < block.getPositionCount(); i++) { | ||
if (block.isNull(i)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this had to be of top performance, this probably would want to have a simpler loop when ! block.mayHaveNull
.
no change requested, since I don't know the perf expectations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW would it work to avoid the if
here?
isNull[i] = block.isNull(i)
values[i] = block.getByte(i, 0);
cc @dain @sopel39 @raunaqmorarka may know if it is legit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't get what was gained by adding abstractions at page source layer for these column adaptations rather than letting the column reader handle it as it does today.
My expectation is that there should be no perf regression from the existing approach (demonstrated through JMH results).
Preserving the mayHaveNull
of the original block is definitely required, downstream operators take advantage of that to improve performance.
Avoiding branch in the above way looks okay to me, however the contract in Block doesn't explcitly specify any expected behaviour for getter on null positions. So I don't know if that can be safely relied on or in future it may change to throw an exception.
@@ -2250,7 +2250,10 @@ private List<SetColumnTypeSetup> setColumnTypeSetupData() | |||
{ | |||
return ImmutableList.<SetColumnTypeSetup>builder() | |||
.add(new SetColumnTypeSetup("tinyint", "TINYINT '127'", "smallint")) | |||
.add(new SetColumnTypeSetup("tinyint", "TINYINT '127'", "integer")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This probably belongs to "Support type evolution from tinyint to smallint and bigint in ORC" (currently in "Add test cases for changing numeric column types")
plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java
Outdated
Show resolved
Hide resolved
plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java
Outdated
Show resolved
Hide resolved
plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseTestHiveOnDataLake.java
Outdated
Show resolved
Hide resolved
...ve/src/test/java/io/trino/plugin/hive/metastore/glue/TestHiveGlueMetastoreCompatibility.java
Outdated
Show resolved
Hide resolved
42db233
to
b8d8f72
Compare
b8d8f72
to
65d6cb1
Compare
65d6cb1
to
45c4adc
Compare
Rebased on master to resolve conflicts and added test cases for storage formats. |
if (storageFormat != HiveStorageFormat.ORC && storageFormat != HiveStorageFormat.PARQUET) { | ||
throw new TrinoException(NOT_SUPPORTED, "Unsupported storage format for changing column type: " + storageFormat); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the check effective?
can a partitioned table have partitions with different file format?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw it's looks like we should be able to lift the limitation for eg TEXTFILE easily.
please add a code comment explaining the logic, why ORC/PARQUET only and what is our stance about partitioned tables.
ImmutableList.Builder<SetColumnTypeSetup> setup = ImmutableList.builder(); | ||
for (HiveStorageFormat storageFormat : HiveStorageFormat.values()) { | ||
if (storageFormat == REGEX) { | ||
// REGEX format is read-only |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Cannot prepare test data with REGEX table"
setup.addAll(super.setColumnTypeSetupData().stream() | ||
.map(data -> data.withTableProperty("format = '%s'".formatted(storageFormat))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is quite a few test cases.
For AVRO, RCBINARY, RCTEXT, SEQUENCEFILE, JSON, TEXTFILE, CSV, [REGEX] should be enough to have one test case eg changing from integer to bigint and checking message is "Unsupported storage format for changing column type"
(then you can simplify pattern in verifySetColumnTypeFailurePermissible)
@@ -2442,7 +2442,7 @@ private List<SetColumnTypeSetup> setColumnTypeSetupData() | |||
.build(); | |||
} | |||
|
|||
public record SetColumnTypeSetup(String sourceColumnType, String sourceValueLiteral, String newColumnType, String newValueLiteral, boolean unsupportedType) | |||
public record SetColumnTypeSetup(String sourceColumnType, String sourceValueLiteral, String newColumnType, String newValueLiteral, boolean unsupportedType, Optional<String> tableProperty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional<String> tableProperty
-> String tableProperties
(empty string means "no properties", no need to wrap in optional)
|
||
public SetColumnTypeSetup withTableProperty(String tableProperty) | ||
{ | ||
return new SetColumnTypeSetup(sourceColumnType, sourceValueLiteral, newColumnType, newValueLiteral, unsupportedType, Optional.of("WITH (%s)".formatted(tableProperty))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
withTableProperty(foo)
should be equivalent do instnatiating new SetColumnTypeSetup(.... , foo)
currently the former wraps with WITH (..)
move the SQL formatting logic to where it is used (will add separate comment there)
@@ -2365,7 +2365,7 @@ public void testSetColumnTypes(SetColumnTypeSetup setup) | |||
|
|||
TestTable table; | |||
try { | |||
table = new TestTable(getQueryRunner()::execute, "test_set_column_type_", " AS SELECT CAST(" + setup.sourceValueLiteral + " AS " + setup.sourceColumnType + ") AS col"); | |||
table = new TestTable(getQueryRunner()::execute, "test_set_column_type_", setup.tableProperty.orElse("") + " AS SELECT CAST(" + setup.sourceValueLiteral + " AS " + setup.sourceColumnType + ") AS col"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tableConfiguration = "";
if (!setup.tableProperties.isEmpty()) {
tableConfiguration += " WITH(%s)".formatted(setup.tableProperties);
}
... new TestTable(..., tableConfiguration + " AS SELECT
You must use Hive to gather table statistics with ``ANALYZE TABLE COMPUTE STATISTICS`` after table creation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pre-existing, but since you're changing this line.... see #15637 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sent #16376
@@ -65,9 +64,6 @@ public static ColumnReader createColumnReader( | |||
case BOOLEAN: | |||
return new BooleanColumnReader(type, column, memoryContext.newLocalMemoryContext(ColumnReaders.class.getSimpleName())); | |||
case BYTE: | |||
if (type == INTEGER && !column.getAttributes().containsKey("iceberg.id")) { | |||
throw invalidStreamType(column, type); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for changing the commit title.
Is TestHiveTransactionalTable.java the only test coverage we have for this "Fix reading ORC files after column evolved from tinyint to integer" change?
public void setColumnType(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle, Type type) | ||
{ | ||
HiveTableHandle table = (HiveTableHandle) tableHandle; | ||
failIfAvroSchemaIsSet(table); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should fail for CSV files too (csv is all varchar
)
table.getPartitionNames().ifPresent(partitionNames -> { | ||
if (partitionNames.contains(column.getName())) { | ||
throw new TrinoException(NOT_SUPPORTED, "Changing partition column types is not supported"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partition names = names of partitions != names of partitioning columns
if (sourceType instanceof VarcharType || sourceType instanceof CharType) { | ||
return targetType instanceof VarcharType || targetType instanceof CharType; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is truncation allowed?
Add a comment
{ | ||
return fields.stream() | ||
.filter(field -> field.getName().orElseThrow().equals(fieldName)) | ||
.findAny(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we expect at most one. i guess we should fail when duplicates
.collect(toOptional());
{ | ||
Table oldTable = getExistingTable(databaseName, tableName); | ||
if (oldTable.getPartitionColumns().stream().anyMatch(column -> column.getName().equals(columnName))) { | ||
throw new TrinoException(NOT_SUPPORTED, "Changing partition column types is not supported"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this check buy us anything wrt the check already done in HiveMetadata?
io.trino.hive.thrift.metastore.Table table = delegate.getTable(databaseName, tableName) | ||
.orElseThrow(() -> new TableNotFoundException(new SchemaTableName(databaseName, tableName))); | ||
if (table.getPartitionKeys().stream().anyMatch(column -> column.getName().equals(columnName))) { | ||
throw new TrinoException(NOT_SUPPORTED, "Changing partition column types is not supported"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this check buy us anything wrt the check already done in HiveMetadata?
assertThat(onHive("SELECT * FROM " + hiveTableName)) | ||
.containsPattern("[ |]+123[ |]+"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
onHive returns a String output from a CLI
this isn't really meant to run test queries
let's use product tests setup for that, where we have Hive JDBC and we can run queries normally, without processing textual output of some foreign CLI tool
* See https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html#credentials-default | ||
* on ways to set your AWS credentials which will be needed to run this test. | ||
*/ | ||
public class TestHiveGlueMetastoreCompatibility |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there some existing test class which we could add the test to?
would be nice to avoid a minute-long setup just to run a second-long test case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't find the existing test class except for TestHiveGlueMetastore
extending AbstractTestHiveLocal
. Do you want me to use the class instead? I avoided the class because I prefer query-based tests.
The field isn't required in Thrift Hive metastore.
45c4adc
to
ec89b8e
Compare
// Hive changes a column definition in each partitions unless the ALTER TABLE statement doesn't contain partition condition | ||
// Trino doesn't support specifying partitions in ALTER TABLE, so SET DATA TYPE updates all partitions | ||
// https://cwiki.apache.org/confluence/display/hive/languagemanual+ddl#LanguageManualDDL-AlterPartition | ||
metastore.alterPartitions(table.getSchemaName(), table.getTableName(), partitions, OptionalLong.empty()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am surprised
Firstl, i wouldn't expect existing partitions to be updated at all.
Second, it will make it impossible to update type in large tables.
Let's talk more about this
Closing because of less demand. |
@ebyhr Hello! |
Description
Support changing column types in Hive connector
Release notes
(x) Release notes are required, with the following suggested text: