diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java index 35b30fabf168..cabb4af0fa55 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java @@ -103,6 +103,8 @@ import io.trino.spi.type.Type; import io.trino.spi.type.TypeManager; import io.trino.spi.type.VarcharType; +import org.apache.avro.Schema; +import org.apache.avro.SchemaParseException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.TableType; @@ -180,6 +182,7 @@ import static io.trino.plugin.hive.HiveSessionProperties.isSortedWritingEnabled; import static io.trino.plugin.hive.HiveSessionProperties.isStatisticsEnabled; import static io.trino.plugin.hive.HiveTableProperties.ANALYZE_COLUMNS_PROPERTY; +import static io.trino.plugin.hive.HiveTableProperties.AVRO_SCHEMA_LITERAL; import static io.trino.plugin.hive.HiveTableProperties.AVRO_SCHEMA_URL; import static io.trino.plugin.hive.HiveTableProperties.BUCKETED_BY_PROPERTY; import static io.trino.plugin.hive.HiveTableProperties.BUCKET_COUNT_PROPERTY; @@ -198,6 +201,7 @@ import static io.trino.plugin.hive.HiveTableProperties.TEXTFILE_FIELD_SEPARATOR; import static io.trino.plugin.hive.HiveTableProperties.TEXTFILE_FIELD_SEPARATOR_ESCAPE; import static io.trino.plugin.hive.HiveTableProperties.getAnalyzeColumns; +import static io.trino.plugin.hive.HiveTableProperties.getAvroSchemaLiteral; import static io.trino.plugin.hive.HiveTableProperties.getAvroSchemaUrl; import static io.trino.plugin.hive.HiveTableProperties.getBucketProperty; import static io.trino.plugin.hive.HiveTableProperties.getExternalLocation; @@ -298,6 +302,7 @@ public class HiveMetadata private static final String NULL_FORMAT_KEY = serdeConstants.SERIALIZATION_NULL_FORMAT; public static final String AVRO_SCHEMA_URL_KEY = "avro.schema.url"; + public static final String AVRO_SCHEMA_LITERAL_KEY = "avro.schema.literal"; public static final String SPARK_TABLE_PROVIDER_KEY = "spark.sql.sources.provider"; public static final String DELTA_LAKE_PROVIDER = "delta"; @@ -625,6 +630,10 @@ private ConnectorTableMetadata doGetTableMetadata(ConnectorSession session, Sche if (avroSchemaUrl != null) { properties.put(AVRO_SCHEMA_URL, avroSchemaUrl); } + String avroSchemaLiteral = table.getParameters().get(AVRO_SCHEMA_LITERAL_KEY); + if (avroSchemaLiteral != null) { + properties.put(AVRO_SCHEMA_LITERAL, avroSchemaLiteral); + } // Textfile and CSV specific properties getSerdeProperty(table, SKIP_HEADER_COUNT_KEY) @@ -852,6 +861,10 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe throw new TrinoException(NOT_SUPPORTED, "Bucketing/Partitioning columns not supported when Avro schema url is set"); } + if ((bucketProperty.isPresent() || !partitionedBy.isEmpty()) && getAvroSchemaLiteral(tableMetadata.getProperties()) != null) { + throw new TrinoException(NOT_SUPPORTED, "Bucketing/Partitioning columns not supported when Avro schema literal is set"); + } + validateTimestampColumns(tableMetadata.getColumns(), getTimestampPrecision(session)); List columnHandles = getColumnHandles(tableMetadata, ImmutableSet.copyOf(partitionedBy)); HiveStorageFormat hiveStorageFormat = getHiveStorageFormat(tableMetadata.getProperties()); @@ -933,10 +946,16 @@ private Map getEmptyTableProperties(ConnectorTableMetadata table // Avro specific properties String avroSchemaUrl = getAvroSchemaUrl(tableMetadata.getProperties()); + String avroSchemaLiteral = getAvroSchemaLiteral(tableMetadata.getProperties()); if (avroSchemaUrl != null) { + checkState(avroSchemaLiteral == null, "avro_schema_url and avro_schema_literal cannot both be set"); checkFormatForProperty(hiveStorageFormat, HiveStorageFormat.AVRO, AVRO_SCHEMA_URL); tableProperties.put(AVRO_SCHEMA_URL_KEY, validateAndNormalizeAvroSchemaUrl(avroSchemaUrl, hdfsContext)); } + else if (avroSchemaLiteral != null) { + checkFormatForProperty(hiveStorageFormat, HiveStorageFormat.AVRO, AVRO_SCHEMA_LITERAL); + tableProperties.put(AVRO_SCHEMA_LITERAL_KEY, validateAvroSchemaLiteral(avroSchemaLiteral)); + } // Textfile and CSV specific properties Set csvAndTextFile = ImmutableSet.of(HiveStorageFormat.TEXTFILE, HiveStorageFormat.CSV); @@ -1053,6 +1072,17 @@ private String validateAndNormalizeAvroSchemaUrl(String url, HdfsContext context } } + private String validateAvroSchemaLiteral(String avroSchemaLiteral) + { + try { + new Schema.Parser().parse(avroSchemaLiteral); + return avroSchemaLiteral; + } + catch (SchemaParseException e) { + throw new TrinoException(INVALID_TABLE_PROPERTY, "Failed to parse Avro schema: " + avroSchemaLiteral, e); + } + } + private static Path getExternalLocationAsPath(String location) { try { @@ -1188,6 +1218,9 @@ private void failIfAvroSchemaIsSet(ConnectorSession session, HiveTableHandle han if (table.getParameters().containsKey(AVRO_SCHEMA_URL_KEY) || table.getStorage().getSerdeParameters().containsKey(AVRO_SCHEMA_URL_KEY)) { throw new TrinoException(NOT_SUPPORTED, "ALTER TABLE not supported when Avro schema url is set"); } + if (table.getParameters().containsKey(AVRO_SCHEMA_LITERAL_KEY) || table.getStorage().getSerdeParameters().containsKey(AVRO_SCHEMA_LITERAL_KEY)) { + throw new TrinoException(NOT_SUPPORTED, "ALTER TABLE not supported when Avro schema literal is set"); + } } @Override @@ -1310,6 +1343,10 @@ public HiveOutputTableHandle beginCreateTable(ConnectorSession session, Connecto throw new TrinoException(NOT_SUPPORTED, "CREATE TABLE AS not supported when Avro schema url is set"); } + if (getAvroSchemaLiteral(tableMetadata.getProperties()) != null) { + throw new TrinoException(NOT_SUPPORTED, "CREATE TABLE AS not supported when Avro schema literal is set"); + } + HiveStorageFormat tableStorageFormat = getHiveStorageFormat(tableMetadata.getProperties()); List partitionedBy = getPartitionedBy(tableMetadata.getProperties()); Optional bucketProperty = getBucketProperty(tableMetadata.getProperties()); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveTableProperties.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveTableProperties.java index 4769179b8556..83f494cc90da 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveTableProperties.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveTableProperties.java @@ -62,6 +62,7 @@ public class HiveTableProperties public static final String ORC_BLOOM_FILTER_COLUMNS = "orc_bloom_filter_columns"; public static final String ORC_BLOOM_FILTER_FPP = "orc_bloom_filter_fpp"; public static final String AVRO_SCHEMA_URL = "avro_schema_url"; + public static final String AVRO_SCHEMA_LITERAL = "avro_schema_literal"; public static final String TEXTFILE_FIELD_SEPARATOR = "textfile_field_separator"; public static final String TEXTFILE_FIELD_SEPARATOR_ESCAPE = "textfile_field_separator_escape"; public static final String NULL_FORMAT_PROPERTY = "null_format"; @@ -148,6 +149,7 @@ public HiveTableProperties( integerProperty(BUCKETING_VERSION, "Bucketing version", null, false), integerProperty(BUCKET_COUNT_PROPERTY, "Number of buckets", 0, false), stringProperty(AVRO_SCHEMA_URL, "URI pointing to Avro schema for the table", null, false), + stringProperty(AVRO_SCHEMA_LITERAL, "JSON-encoded Avro schema for the table", null, false), integerProperty(SKIP_HEADER_LINE_COUNT, "Number of header lines", null, false), integerProperty(SKIP_FOOTER_LINE_COUNT, "Number of footer lines", null, false), stringProperty(TEXTFILE_FIELD_SEPARATOR, "TEXTFILE field separator character", null, false), @@ -174,6 +176,11 @@ public static String getAvroSchemaUrl(Map tableProperties) return (String) tableProperties.get(AVRO_SCHEMA_URL); } + public static String getAvroSchemaLiteral(Map tableProperties) + { + return (String) tableProperties.get(AVRO_SCHEMA_LITERAL); + } + public static Optional getHeaderSkipCount(Map tableProperties) { return Optional.ofNullable((Integer) tableProperties.get(SKIP_HEADER_LINE_COUNT)); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/MetastoreUtil.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/MetastoreUtil.java index c02662763d5e..f9f08c600b50 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/MetastoreUtil.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/MetastoreUtil.java @@ -43,6 +43,7 @@ import io.trino.spi.type.VarcharType; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.metastore.ProtectMode; +import org.apache.hadoop.hive.serde2.avro.AvroSerDe; import org.joda.time.format.DateTimeFormatter; import org.joda.time.format.ISODateTimeFormat; @@ -57,10 +58,9 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Strings.isNullOrEmpty; +import static com.google.common.base.Strings.nullToEmpty; import static com.google.common.collect.ImmutableList.toImmutableList; -import static io.trino.plugin.hive.HiveMetadata.AVRO_SCHEMA_URL_KEY; import static io.trino.plugin.hive.HiveSplitManager.PRESTO_OFFLINE; -import static io.trino.plugin.hive.HiveStorageFormat.AVRO; import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.NUM_ROWS; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.spi.predicate.TupleDomain.withColumnDomains; @@ -95,6 +95,7 @@ public static Properties getHiveSchema(Table table) // Mimics function in Hive: MetaStoreUtils.getTableMetadata(Table) return getHiveSchema( table.getStorage(), + Optional.empty(), table.getDataColumns(), table.getDataColumns(), table.getParameters(), @@ -108,6 +109,7 @@ public static Properties getHiveSchema(Partition partition, Table table) // Mimics function in Hive: MetaStoreUtils.getSchema(Partition, Table) return getHiveSchema( partition.getStorage(), + Optional.of(table.getStorage()), partition.getColumns(), table.getDataColumns(), table.getParameters(), @@ -118,6 +120,7 @@ public static Properties getHiveSchema(Partition partition, Table table) private static Properties getHiveSchema( Storage sd, + Optional tableSd, List dataColumns, List tableDataColumns, Map parameters, @@ -147,6 +150,14 @@ private static Properties getHiveSchema( for (Map.Entry param : sd.getSerdeParameters().entrySet()) { schema.setProperty(param.getKey(), (param.getValue() != null) ? param.getValue() : ""); } + // Sometimes Avro schema is stored in table-level SerDe parameters. And in such cases, table-level schema should + // override partition-level schema because we should pass the table-level schema to GenericHiveRecordCursor and let + // Avro reader handles schema evolution itself. + if (sd.getStorageFormat().getSerDe().equals(AvroSerDe.class.getName()) && tableSd.isPresent()) { + for (Map.Entry param : tableSd.get().getSerdeParameters().entrySet()) { + schema.setProperty(param.getKey(), nullToEmpty(param.getValue())); + } + } schema.setProperty(SERIALIZATION_LIB, sd.getStorageFormat().getSerDe()); StringBuilder columnNameBuilder = new StringBuilder(); @@ -213,13 +224,6 @@ public static ProtectMode getProtectMode(Table table) return getProtectMode(table.getParameters()); } - public static boolean isAvroTableWithSchemaSet(Table table) - { - return AVRO.getSerDe().equals(table.getStorage().getStorageFormat().getSerDeNullable()) && - (table.getParameters().get(AVRO_SCHEMA_URL_KEY) != null || - (table.getStorage().getSerdeParameters().get(AVRO_SCHEMA_URL_KEY) != null)); - } - public static String makePartitionName(Table table, Partition partition) { return makePartitionName(table.getPartitionColumns(), partition.getValues()); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/BridgingHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/BridgingHiveMetastore.java index 9dd710c3bfeb..827f186fc60a 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/BridgingHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/BridgingHiveMetastore.java @@ -53,13 +53,13 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT; -import static io.trino.plugin.hive.metastore.MetastoreUtil.isAvroTableWithSchemaSet; import static io.trino.plugin.hive.metastore.MetastoreUtil.verifyCanDropColumn; import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.csvSchemaFields; import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.fromMetastoreApiDatabase; import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.fromMetastoreApiTable; -import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.isAvroTableWithSchemaSet; import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.isCsvTable; +import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.isPartitionSerdesUsingMetastoreForSchema; +import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.isTableSerdesUsingMetastoreForSchema; import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.toMetastoreApiDatabase; import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.toMetastoreApiTable; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; @@ -94,12 +94,12 @@ public List getAllDatabases() public Optional getTable(HiveIdentity identity, String databaseName, String tableName) { return delegate.getTable(identity, databaseName, tableName).map(table -> { - if (isAvroTableWithSchemaSet(table)) { - return fromMetastoreApiTable(table, delegate.getFields(identity, databaseName, tableName).orElseThrow()); - } if (isCsvTable(table)) { return fromMetastoreApiTable(table, csvSchemaFields(table.getSd().getCols())); } + if (!isTableSerdesUsingMetastoreForSchema(table)) { + return fromMetastoreApiTable(table, delegate.getFields(identity, databaseName, tableName).orElseThrow()); + } return fromMetastoreApiTable(table); }); } @@ -381,7 +381,10 @@ public Map> getPartitionsByNames(HiveIdentity identi private Partition fromMetastoreApiPartition(Table table, org.apache.hadoop.hive.metastore.api.Partition partition) { - if (isAvroTableWithSchemaSet(table)) { + // If a partition's Serde doesn't use metastore for schema (like Avro), we use table-level schema, instead of + // fetching the schema from the partition's Serde. Data publishers are responsible for making sure that table-level + // schema is workable for these partitions. + if (!isPartitionSerdesUsingMetastoreForSchema(partition)) { List schema = table.getDataColumns().stream() .map(ThriftMetastoreUtil::toMetastoreApiFieldSchema) .collect(toImmutableList()); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftHiveMetastore.java index 281f60240528..58cfd180e851 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftHiveMetastore.java @@ -136,7 +136,7 @@ import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.fromRolePrincipalGrants; import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.fromTrinoPrincipalType; import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.getHiveBasicStatistics; -import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.isAvroTableWithSchemaSet; +import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.isTableSerdesUsingMetastoreForSchema; import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.parsePrivilege; import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.toMetastoreApiPartition; import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.updateStatisticsParameters; @@ -535,8 +535,8 @@ public void updateTableStatistics(HiveIdentity identity, String databaseName, St List metastoreColumnStatistics = updatedStatistics.getColumnStatistics().entrySet().stream() .flatMap(entry -> { Optional column = table.getColumn(entry.getKey()); - if (column.isEmpty() && isAvroTableWithSchemaSet(modifiedTable)) { - // Avro table can have different effective schema than declared in metastore. Still, metastore does not allow + if (column.isEmpty() && !isTableSerdesUsingMetastoreForSchema(modifiedTable)) { + // Some tables can have different effective schema than declared in metastore. Still, metastore does not allow // to store statistics for a column it does not know about. return Stream.of(); } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftMetastoreUtil.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftMetastoreUtil.java index 750eec1b7c29..1f68131296ce 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftMetastoreUtil.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftMetastoreUtil.java @@ -67,6 +67,7 @@ import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; @@ -97,8 +98,6 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_METADATA; -import static io.trino.plugin.hive.HiveMetadata.AVRO_SCHEMA_URL_KEY; -import static io.trino.plugin.hive.HiveStorageFormat.AVRO; import static io.trino.plugin.hive.HiveStorageFormat.CSV; import static io.trino.plugin.hive.metastore.HiveColumnStatistics.createBinaryColumnStatistics; import static io.trino.plugin.hive.metastore.HiveColumnStatistics.createBooleanColumnStatistics; @@ -152,6 +151,12 @@ public final class ThriftMetastoreUtil private static final String RAW_DATA_SIZE = "rawDataSize"; private static final String TOTAL_SIZE = "totalSize"; private static final Set STATS_PROPERTIES = ImmutableSet.of(NUM_FILES, NUM_ROWS, RAW_DATA_SIZE, TOTAL_SIZE); + // "org.apache.hive.hcatalog.data.JsonSerDe" is included here because it's in hive-hcatalog-core.jar rather than hive-serde.jar. Metastore call + // getFields doesn't work with it. + public static final Set SERDES_USING_METASTORE_FOR_SCHEMA = ImmutableSet.builder() + .addAll(Arrays.asList(((String) MetastoreConf.ConfVars.SERDES_USING_METASTORE_FOR_SCHEMA.getDefaultVal()).split(","))) + .add("org.apache.hive.hcatalog.data.JsonSerDe") + .build(); private ThriftMetastoreUtil() {} @@ -436,17 +441,15 @@ public static Table fromMetastoreApiTable(org.apache.hadoop.hive.metastore.api.T return tableBuilder.build(); } - public static boolean isAvroTableWithSchemaSet(org.apache.hadoop.hive.metastore.api.Table table) + public static boolean isTableSerdesUsingMetastoreForSchema(org.apache.hadoop.hive.metastore.api.Table table) { - if (table.getParameters() == null) { - return false; - } - SerDeInfo serdeInfo = getSerdeInfo(table); + return SERDES_USING_METASTORE_FOR_SCHEMA.contains(getSerdeInfo(table).getSerializationLib()); + } - return serdeInfo.getSerializationLib() != null && - (table.getParameters().get(AVRO_SCHEMA_URL_KEY) != null || - (serdeInfo.getParameters() != null && serdeInfo.getParameters().get(AVRO_SCHEMA_URL_KEY) != null)) && - serdeInfo.getSerializationLib().equals(AVRO.getSerDe()); + public static boolean isPartitionSerdesUsingMetastoreForSchema(org.apache.hadoop.hive.metastore.api.Partition partition) + { + SerDeInfo serDeInfo = getSerdeInfo(partition); + return serDeInfo != null && SERDES_USING_METASTORE_FOR_SCHEMA.contains(serDeInfo.getSerializationLib()); } public static boolean isCsvTable(org.apache.hadoop.hive.metastore.api.Table table) @@ -511,6 +514,15 @@ public static Partition fromMetastoreApiPartition(org.apache.hadoop.hive.metasto return partitionBuilder.build(); } + private static SerDeInfo getSerdeInfo(org.apache.hadoop.hive.metastore.api.Partition partition) + { + StorageDescriptor storageDescriptor = partition.getSd(); + if (storageDescriptor == null) { + throw new TrinoException(HIVE_INVALID_METADATA, "Partition does not contain a storage descriptor: " + partition); + } + return storageDescriptor.getSerdeInfo(); + } + public static HiveColumnStatistics fromMetastoreApiColumnStatistics(ColumnStatisticsObj columnStatistics, OptionalLong rowCount) { if (columnStatistics.getStatsData().isSetLongStats()) { diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/MockThriftMetastoreClient.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/MockThriftMetastoreClient.java index 3b5fda750727..f3650f71d7c1 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/MockThriftMetastoreClient.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/MockThriftMetastoreClient.java @@ -166,7 +166,7 @@ public Table getTableWithCapabilities(String databaseName, String tableName) @Override public List getFields(String databaseName, String tableName) { - return ImmutableList.of(new FieldSchema("key", "string", null)); + return ImmutableList.of(new FieldSchema(TEST_COLUMN, "bigint", null)); } @Override diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/hive/TestAvroSchemaEvolution.java b/testing/trino-product-tests/src/main/java/io/trino/tests/hive/TestAvroSchemaEvolution.java index 25c641d4a0e5..ab9f1d7815f7 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/hive/TestAvroSchemaEvolution.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/hive/TestAvroSchemaEvolution.java @@ -19,6 +19,11 @@ import io.trino.tempto.query.QueryExecutor; import org.testng.annotations.Test; +import java.io.IOException; +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Path; + import static io.trino.tempto.assertions.QueryAssert.Row.row; import static io.trino.tempto.assertions.QueryAssert.assertThat; import static io.trino.tempto.context.ThreadLocalTestContextHolder.testContext; @@ -29,54 +34,70 @@ public class TestAvroSchemaEvolution extends ProductTest { - private static final String TABLE_NAME = "product_tests_avro_table"; + private static final String TABLE_WITH_SCHEMA_URL = "product_tests_avro_table_with_schema_url"; + private static final String TABLE_WITH_SCHEMA_LITERAL = "product_tests_avro_table_with_schema_literal"; // TODO move Avro schema files to classpath and use tempto SshClient to upload them - private static final String ORIGINAL_SCHEMA = "file:///docker/presto-product-tests/avro/original_schema.avsc"; - private static final String CREATE_TABLE = format("" + - "CREATE TABLE %s (dummy_col VARCHAR)" + - "WITH (" + - "format='AVRO', " + - "avro_schema_url='%s'" + - ")", - TABLE_NAME, - ORIGINAL_SCHEMA); - private static final String RENAMED_COLUMN_SCHEMA = "file:///docker/presto-product-tests/avro/rename_column_schema.avsc"; - private static final String REMOVED_COLUMN_SCHEMA = "file:///docker/presto-product-tests/avro/remove_column_schema.avsc"; - private static final String ADDED_COLUMN_SCHEMA = "file:///docker/presto-product-tests/avro/add_column_schema.avsc"; - private static final String CHANGE_COLUMN_TYPE_SCHEMA = "file:///docker/presto-product-tests/avro/change_column_type_schema.avsc"; - private static final String INCOMPATIBLE_TYPE_SCHEMA = "file:///docker/presto-product-tests/avro/incompatible_type_schema.avsc"; - private static final String SELECT_STAR = "SELECT * FROM " + TABLE_NAME; - private static final String COLUMNS_IN_TABLE = "SHOW COLUMNS IN " + TABLE_NAME; + private static final String ORIGINAL_SCHEMA_URL = "file:///docker/presto-product-tests/avro/original_schema.avsc"; + private static final String RENAMED_COLUMN_SCHEMA_URL = "file:///docker/presto-product-tests/avro/rename_column_schema.avsc"; + private static final String REMOVED_COLUMN_SCHEMA_URL = "file:///docker/presto-product-tests/avro/remove_column_schema.avsc"; + private static final String ADDED_COLUMN_SCHEMA_URL = "file:///docker/presto-product-tests/avro/add_column_schema.avsc"; + private static final String CHANGE_COLUMN_TYPE_SCHEMA_URL = "file:///docker/presto-product-tests/avro/change_column_type_schema.avsc"; + private static final String INCOMPATIBLE_TYPE_SCHEMA_URL = "file:///docker/presto-product-tests/avro/incompatible_type_schema.avsc"; @BeforeTestWithContext public void createAndLoadTable() + throws IOException { - query(CREATE_TABLE); - query(format("INSERT INTO %s VALUES ('string0', 0)", TABLE_NAME)); + query(format("" + + "CREATE TABLE %s (dummy_col VARCHAR)" + + "WITH (" + + "format='AVRO', " + + "avro_schema_url='%s'" + + ")", + TABLE_WITH_SCHEMA_URL, + ORIGINAL_SCHEMA_URL)); + query(format("" + + "CREATE TABLE %s (dummy_col VARCHAR)" + + "WITH (" + + "format='AVRO', " + + "avro_schema_literal='%s'" + + ")", + TABLE_WITH_SCHEMA_LITERAL, + readSchemaLiteralFromUrl(ORIGINAL_SCHEMA_URL))); + query(format("INSERT INTO %s VALUES ('string0', 0)", TABLE_WITH_SCHEMA_URL)); + query(format("INSERT INTO %s VALUES ('string0', 0)", TABLE_WITH_SCHEMA_LITERAL)); } @AfterTestWithContext public void dropTestTable() { - query(format("DROP TABLE IF EXISTS %s", TABLE_NAME)); + query(format("DROP TABLE IF EXISTS %s", TABLE_WITH_SCHEMA_URL)); + query(format("DROP TABLE IF EXISTS %s", TABLE_WITH_SCHEMA_LITERAL)); } @Test(groups = AVRO) public void testSelectTable() { - assertThat(query(format("SELECT string_col FROM %s", TABLE_NAME))) - .containsExactly(row("string0")); + assertUnmodified(); } @Test(groups = AVRO) public void testInsertAfterSchemaEvolution() + throws IOException { - assertThat(query(SELECT_STAR)) - .containsExactly(row("string0", 0)); + assertUnmodified(); + + alterTableSchemaUrl(TABLE_WITH_SCHEMA_URL, ADDED_COLUMN_SCHEMA_URL); + alterTableSchemaLiteral(TABLE_WITH_SCHEMA_LITERAL, readSchemaLiteralFromUrl(ADDED_COLUMN_SCHEMA_URL)); - alterTableSchemaTo(ADDED_COLUMN_SCHEMA); - query(format("INSERT INTO %s VALUES ('string1', 1, 101)", TABLE_NAME)); - assertThat(query(SELECT_STAR)) + query(format("INSERT INTO %s VALUES ('string1', 1, 101)", TABLE_WITH_SCHEMA_URL)); + query(format("INSERT INTO %s VALUES ('string1', 1, 101)", TABLE_WITH_SCHEMA_LITERAL)); + + assertThat(query("SELECT * FROM " + TABLE_WITH_SCHEMA_URL)) + .containsOnly( + row("string0", 0, 100), + row("string1", 1, 101)); + assertThat(query("SELECT * FROM " + TABLE_WITH_SCHEMA_LITERAL)) .containsOnly( row("string0", 0, 100), row("string1", 1, 101)); @@ -84,74 +105,107 @@ public void testInsertAfterSchemaEvolution() @Test(groups = AVRO) public void testSchemaEvolutionWithIncompatibleType() + throws IOException { - assertThat(query(COLUMNS_IN_TABLE)) - .containsExactly( - row("string_col", "varchar", "", ""), - row("int_col", "integer", "", "")); - assertThat(query(SELECT_STAR)) - .containsExactly(row("string0", 0)); + assertUnmodified(); - alterTableSchemaTo(INCOMPATIBLE_TYPE_SCHEMA); - assertThat(() -> query(SELECT_STAR)) + alterTableSchemaUrl(TABLE_WITH_SCHEMA_URL, INCOMPATIBLE_TYPE_SCHEMA_URL); + alterTableSchemaLiteral(TABLE_WITH_SCHEMA_LITERAL, readSchemaLiteralFromUrl(INCOMPATIBLE_TYPE_SCHEMA_URL)); + + assertThat(() -> query("SELECT * FROM " + TABLE_WITH_SCHEMA_URL)) + .failsWithMessage("Found int, expecting string"); + assertThat(() -> query("SELECT * FROM " + TABLE_WITH_SCHEMA_URL)) .failsWithMessage("Found int, expecting string"); } @Test(groups = AVRO) public void testSchemaEvolution() + throws IOException { - assertThat(query(COLUMNS_IN_TABLE)) - .containsExactly( - row("string_col", "varchar", "", ""), - row("int_col", "integer", "", "")); - assertThat(query(SELECT_STAR)) - .containsExactly(row("string0", 0)); + assertUnmodified(); - alterTableSchemaTo(CHANGE_COLUMN_TYPE_SCHEMA); - assertThat(query(COLUMNS_IN_TABLE)) - .containsExactly( + alterTableSchemaUrl(TABLE_WITH_SCHEMA_URL, CHANGE_COLUMN_TYPE_SCHEMA_URL); + alterTableSchemaLiteral(TABLE_WITH_SCHEMA_LITERAL, readSchemaLiteralFromUrl(CHANGE_COLUMN_TYPE_SCHEMA_URL)); + + assertThat(query("SHOW COLUMNS IN " + TABLE_WITH_SCHEMA_URL)) + .containsExactlyInOrder( + row("string_col", "varchar", "", ""), + row("int_col", "bigint", "", "")); + assertThat(query("SHOW COLUMNS IN " + TABLE_WITH_SCHEMA_LITERAL)) + .containsExactlyInOrder( row("string_col", "varchar", "", ""), row("int_col", "bigint", "", "")); - assertThat(query(SELECT_STAR)) - .containsExactly(row("string0", 0)); - alterTableSchemaTo(ADDED_COLUMN_SCHEMA); - assertThat(query(COLUMNS_IN_TABLE)) - .containsExactly( + assertThat(query("SELECT * FROM " + TABLE_WITH_SCHEMA_URL)) + .containsOnly(row("string0", 0)); + assertThat(query("SELECT * FROM " + TABLE_WITH_SCHEMA_LITERAL)) + .containsOnly(row("string0", 0)); + + alterTableSchemaUrl(TABLE_WITH_SCHEMA_URL, ADDED_COLUMN_SCHEMA_URL); + alterTableSchemaLiteral(TABLE_WITH_SCHEMA_LITERAL, readSchemaLiteralFromUrl(ADDED_COLUMN_SCHEMA_URL)); + + assertThat(query("SHOW COLUMNS IN " + TABLE_WITH_SCHEMA_URL)) + .containsExactlyInOrder( row("string_col", "varchar", "", ""), row("int_col", "integer", "", ""), row("int_col_added", "integer", "", "")); - assertThat(query(SELECT_STAR)) - .containsExactly(row("string0", 0, 100)); - - alterTableSchemaTo(REMOVED_COLUMN_SCHEMA); - assertThat(query(COLUMNS_IN_TABLE)) - .containsExactly(row("int_col", "integer", "", "")); - assertThat(query(SELECT_STAR)) - .containsExactly(row(0)); - - alterTableSchemaTo(RENAMED_COLUMN_SCHEMA); - assertThat(query(COLUMNS_IN_TABLE)) - .containsExactly( + assertThat(query("SHOW COLUMNS IN " + TABLE_WITH_SCHEMA_LITERAL)) + .containsExactlyInOrder( + row("string_col", "varchar", "", ""), + row("int_col", "integer", "", ""), + row("int_col_added", "integer", "", "")); + + assertThat(query("SELECT * FROM " + TABLE_WITH_SCHEMA_URL)) + .containsOnly(row("string0", 0, 100)); + assertThat(query("SELECT * FROM " + TABLE_WITH_SCHEMA_LITERAL)) + .containsOnly(row("string0", 0, 100)); + + alterTableSchemaUrl(TABLE_WITH_SCHEMA_URL, REMOVED_COLUMN_SCHEMA_URL); + alterTableSchemaLiteral(TABLE_WITH_SCHEMA_LITERAL, readSchemaLiteralFromUrl(REMOVED_COLUMN_SCHEMA_URL)); + + assertThat(query("SHOW COLUMNS IN " + TABLE_WITH_SCHEMA_URL)) + .containsExactlyInOrder( + row("int_col", "integer", "", "")); + assertThat(query("SHOW COLUMNS IN " + TABLE_WITH_SCHEMA_LITERAL)) + .containsExactlyInOrder( + row("int_col", "integer", "", "")); + + assertThat(query("SELECT * FROM " + TABLE_WITH_SCHEMA_URL)) + .containsOnly(row(0)); + assertThat(query("SELECT * FROM " + TABLE_WITH_SCHEMA_LITERAL)) + .containsOnly(row(0)); + + alterTableSchemaUrl(TABLE_WITH_SCHEMA_URL, RENAMED_COLUMN_SCHEMA_URL); + alterTableSchemaLiteral(TABLE_WITH_SCHEMA_LITERAL, readSchemaLiteralFromUrl(RENAMED_COLUMN_SCHEMA_URL)); + + assertThat(query("SHOW COLUMNS IN " + TABLE_WITH_SCHEMA_URL)) + .containsExactlyInOrder( + row("string_col", "varchar", "", ""), + row("int_col_renamed", "integer", "", "")); + assertThat(query("SHOW COLUMNS IN " + TABLE_WITH_SCHEMA_LITERAL)) + .containsExactlyInOrder( row("string_col", "varchar", "", ""), row("int_col_renamed", "integer", "", "")); - assertThat(query(SELECT_STAR)) - .containsExactly(row("string0", null)); + + assertThat(query("SELECT * FROM " + TABLE_WITH_SCHEMA_URL)) + .containsOnly(row("string0", null)); + assertThat(query("SELECT * FROM " + TABLE_WITH_SCHEMA_LITERAL)) + .containsOnly(row("string0", null)); } @Test(groups = AVRO) public void testSchemaWhenUrlIsUnset() { - assertThat(query(COLUMNS_IN_TABLE)) - .containsExactly( - row("string_col", "varchar", "", ""), - row("int_col", "integer", "", "")); - assertThat(query(SELECT_STAR)) - .containsExactly(row("string0", 0)); + assertUnmodified(); - executeHiveQuery(format("ALTER TABLE %s UNSET TBLPROPERTIES('avro.schema.url')", TABLE_NAME)); - assertThat(query(COLUMNS_IN_TABLE)) - .containsExactly( + executeHiveQuery(format("ALTER TABLE %s UNSET TBLPROPERTIES('avro.schema.url')", TABLE_WITH_SCHEMA_URL)); + executeHiveQuery(format("ALTER TABLE %s UNSET TBLPROPERTIES('avro.schema.literal')", TABLE_WITH_SCHEMA_LITERAL)); + + assertThat(query("SHOW COLUMNS IN " + TABLE_WITH_SCHEMA_URL)) + .containsExactlyInOrder( + row("dummy_col", "varchar", "", "")); + assertThat(query("SHOW COLUMNS IN " + TABLE_WITH_SCHEMA_LITERAL)) + .containsExactlyInOrder( row("dummy_col", "varchar", "", "")); } @@ -162,22 +216,61 @@ public void testCreateTableLike() query(format( "CREATE TABLE %s (LIKE %s INCLUDING PROPERTIES)", createTableLikeName, - TABLE_NAME)); + TABLE_WITH_SCHEMA_URL)); + + query(format("INSERT INTO %s VALUES ('string0', 0)", createTableLikeName)); + + assertThat(query(format("SELECT string_col FROM %s", createTableLikeName))) + .containsExactlyInOrder(row("string0")); + query("DROP TABLE IF EXISTS " + createTableLikeName); + + query(format( + "CREATE TABLE %s (LIKE %s INCLUDING PROPERTIES)", + createTableLikeName, + TABLE_WITH_SCHEMA_LITERAL)); query(format("INSERT INTO %s VALUES ('string0', 0)", createTableLikeName)); assertThat(query(format("SELECT string_col FROM %s", createTableLikeName))) - .containsExactly(row("string0")); + .containsExactlyInOrder(row("string0")); query("DROP TABLE IF EXISTS " + createTableLikeName); } - private void alterTableSchemaTo(String schema) + private void assertUnmodified() { - executeHiveQuery(format("ALTER TABLE %s SET TBLPROPERTIES('avro.schema.url'='%s')", TABLE_NAME, schema)); + assertThat(query("SHOW COLUMNS IN " + TABLE_WITH_SCHEMA_URL)) + .containsExactlyInOrder( + row("string_col", "varchar", "", ""), + row("int_col", "integer", "", "")); + assertThat(query("SHOW COLUMNS IN " + TABLE_WITH_SCHEMA_LITERAL)) + .containsExactlyInOrder( + row("string_col", "varchar", "", ""), + row("int_col", "integer", "", "")); + + assertThat(query("SELECT * FROM " + TABLE_WITH_SCHEMA_URL)) + .containsOnly(row("string0", 0)); + assertThat(query("SELECT * FROM " + TABLE_WITH_SCHEMA_LITERAL)) + .containsOnly(row("string0", 0)); + } + + private static void alterTableSchemaUrl(String tableName, String newUrl) + { + executeHiveQuery(format("ALTER TABLE %s SET TBLPROPERTIES('avro.schema.url'='%s')", tableName, newUrl)); + } + + private static void alterTableSchemaLiteral(String tableName, String newLiteral) + { + executeHiveQuery(format("ALTER TABLE %s SET TBLPROPERTIES('avro.schema.literal'='%s')", tableName, newLiteral)); } private static void executeHiveQuery(String query) { testContext().getDependency(QueryExecutor.class, "hive").executeQuery(query); } + + private static String readSchemaLiteralFromUrl(String url) + throws IOException + { + return Files.readString(Path.of(URI.create(url))); + } } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/hive/TestAvroSchemaLiteral.java b/testing/trino-product-tests/src/main/java/io/trino/tests/hive/TestAvroSchemaLiteral.java new file mode 100644 index 000000000000..411993418bf8 --- /dev/null +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/hive/TestAvroSchemaLiteral.java @@ -0,0 +1,71 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.tests.hive; + +import org.intellij.lang.annotations.Language; +import org.testng.annotations.Test; + +import static io.trino.tempto.assertions.QueryAssert.Row.row; +import static io.trino.tempto.assertions.QueryAssert.assertThat; +import static io.trino.tests.TestGroups.AVRO; +import static io.trino.tests.utils.QueryExecutors.onHive; +import static io.trino.tests.utils.QueryExecutors.onPresto; +import static java.lang.String.format; + +public class TestAvroSchemaLiteral + extends HiveProductTest +{ + @Language("JSON") + private static final String SCHEMA_LITERAL = "{\n" + + " \"namespace\": \"io.trino.test\",\n" + + " \"name\": \"product_tests_avro_table\",\n" + + " \"type\": \"record\",\n" + + " \"fields\": [\n" + + " { \"name\":\"string_col\", \"type\":\"string\"},\n" + + " { \"name\":\"int_col\", \"type\":\"int\" }\n" + + "]}"; + + @Test(groups = AVRO) + public void testHiveCreatedTable() + { + onHive().executeQuery("DROP TABLE IF EXISTS test_avro_schema_literal_hive"); + onHive().executeQuery(format("" + + "CREATE TABLE test_avro_schema_literal_hive " + + "ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' " + + "STORED AS " + + "INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' " + + "OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' " + + "TBLPROPERTIES ('avro.schema.literal'='%s')", + SCHEMA_LITERAL)); + onHive().executeQuery("INSERT INTO test_avro_schema_literal_hive VALUES ('some text', 123042)"); + + assertThat(onHive().executeQuery("SELECT * FROM test_avro_schema_literal_hive")).containsExactlyInOrder(row("some text", 123042)); + assertThat(onPresto().executeQuery("SELECT * FROM test_avro_schema_literal_hive")).containsExactlyInOrder(row("some text", 123042)); + + onHive().executeQuery("DROP TABLE test_avro_schema_literal_hive"); + } + + @Test(groups = AVRO) + public void testPrestoCreatedTable() + { + onPresto().executeQuery("DROP TABLE IF EXISTS test_avro_schema_literal_presto"); + onPresto().executeQuery(format("CREATE TABLE test_avro_schema_literal_presto (dummy_col VARCHAR) WITH (format='AVRO', avro_schema_literal='%s')", SCHEMA_LITERAL)); + onPresto().executeQuery("INSERT INTO test_avro_schema_literal_presto VALUES ('some text', 123042)"); + + assertThat(onHive().executeQuery("SELECT * FROM test_avro_schema_literal_presto")).containsExactlyInOrder(row("some text", 123042)); + assertThat(onPresto().executeQuery("SELECT * FROM test_avro_schema_literal_presto")).containsExactlyInOrder(row("some text", 123042)); + + onPresto().executeQuery("DROP TABLE test_avro_schema_literal_presto"); + } +}