Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

avro.schema.literal support and some related patches #5101

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import io.trino.spi.type.VarcharType;
import org.apache.avro.Schema;
import org.apache.avro.SchemaParseException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.TableType;
Expand Down Expand Up @@ -180,6 +182,7 @@
import static io.trino.plugin.hive.HiveSessionProperties.isSortedWritingEnabled;
import static io.trino.plugin.hive.HiveSessionProperties.isStatisticsEnabled;
import static io.trino.plugin.hive.HiveTableProperties.ANALYZE_COLUMNS_PROPERTY;
import static io.trino.plugin.hive.HiveTableProperties.AVRO_SCHEMA_LITERAL;
import static io.trino.plugin.hive.HiveTableProperties.AVRO_SCHEMA_URL;
import static io.trino.plugin.hive.HiveTableProperties.BUCKETED_BY_PROPERTY;
import static io.trino.plugin.hive.HiveTableProperties.BUCKET_COUNT_PROPERTY;
Expand All @@ -198,6 +201,7 @@
import static io.trino.plugin.hive.HiveTableProperties.TEXTFILE_FIELD_SEPARATOR;
import static io.trino.plugin.hive.HiveTableProperties.TEXTFILE_FIELD_SEPARATOR_ESCAPE;
import static io.trino.plugin.hive.HiveTableProperties.getAnalyzeColumns;
import static io.trino.plugin.hive.HiveTableProperties.getAvroSchemaLiteral;
import static io.trino.plugin.hive.HiveTableProperties.getAvroSchemaUrl;
import static io.trino.plugin.hive.HiveTableProperties.getBucketProperty;
import static io.trino.plugin.hive.HiveTableProperties.getExternalLocation;
Expand Down Expand Up @@ -298,6 +302,7 @@ public class HiveMetadata
private static final String NULL_FORMAT_KEY = serdeConstants.SERIALIZATION_NULL_FORMAT;

public static final String AVRO_SCHEMA_URL_KEY = "avro.schema.url";
public static final String AVRO_SCHEMA_LITERAL_KEY = "avro.schema.literal";
public static final String SPARK_TABLE_PROVIDER_KEY = "spark.sql.sources.provider";
public static final String DELTA_LAKE_PROVIDER = "delta";

Expand Down Expand Up @@ -625,6 +630,10 @@ private ConnectorTableMetadata doGetTableMetadata(ConnectorSession session, Sche
if (avroSchemaUrl != null) {
properties.put(AVRO_SCHEMA_URL, avroSchemaUrl);
}
String avroSchemaLiteral = table.getParameters().get(AVRO_SCHEMA_LITERAL_KEY);
if (avroSchemaLiteral != null) {
properties.put(AVRO_SCHEMA_LITERAL, avroSchemaLiteral);
}

// Textfile and CSV specific properties
getSerdeProperty(table, SKIP_HEADER_COUNT_KEY)
Expand Down Expand Up @@ -852,6 +861,10 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
throw new TrinoException(NOT_SUPPORTED, "Bucketing/Partitioning columns not supported when Avro schema url is set");
}

if ((bucketProperty.isPresent() || !partitionedBy.isEmpty()) && getAvroSchemaLiteral(tableMetadata.getProperties()) != null) {
throw new TrinoException(NOT_SUPPORTED, "Bucketing/Partitioning columns not supported when Avro schema literal is set");
}

validateTimestampColumns(tableMetadata.getColumns(), getTimestampPrecision(session));
List<HiveColumnHandle> columnHandles = getColumnHandles(tableMetadata, ImmutableSet.copyOf(partitionedBy));
HiveStorageFormat hiveStorageFormat = getHiveStorageFormat(tableMetadata.getProperties());
Expand Down Expand Up @@ -933,10 +946,16 @@ private Map<String, String> getEmptyTableProperties(ConnectorTableMetadata table

// Avro specific properties
String avroSchemaUrl = getAvroSchemaUrl(tableMetadata.getProperties());
String avroSchemaLiteral = getAvroSchemaLiteral(tableMetadata.getProperties());
if (avroSchemaUrl != null) {
checkState(avroSchemaLiteral == null, "avro_schema_url and avro_schema_literal cannot both be set");
checkFormatForProperty(hiveStorageFormat, HiveStorageFormat.AVRO, AVRO_SCHEMA_URL);
tableProperties.put(AVRO_SCHEMA_URL_KEY, validateAndNormalizeAvroSchemaUrl(avroSchemaUrl, hdfsContext));
}
else if (avroSchemaLiteral != null) {
checkFormatForProperty(hiveStorageFormat, HiveStorageFormat.AVRO, AVRO_SCHEMA_LITERAL);
tableProperties.put(AVRO_SCHEMA_LITERAL_KEY, validateAvroSchemaLiteral(avroSchemaLiteral));
}

// Textfile and CSV specific properties
Set<HiveStorageFormat> csvAndTextFile = ImmutableSet.of(HiveStorageFormat.TEXTFILE, HiveStorageFormat.CSV);
Expand Down Expand Up @@ -1053,6 +1072,17 @@ private String validateAndNormalizeAvroSchemaUrl(String url, HdfsContext context
}
}

private String validateAvroSchemaLiteral(String avroSchemaLiteral)
{
try {
new Schema.Parser().parse(avroSchemaLiteral);
return avroSchemaLiteral;
}
catch (SchemaParseException e) {
throw new TrinoException(INVALID_TABLE_PROPERTY, "Failed to parse Avro schema: " + avroSchemaLiteral, e);
}
}

private static Path getExternalLocationAsPath(String location)
{
try {
Expand Down Expand Up @@ -1188,6 +1218,9 @@ private void failIfAvroSchemaIsSet(ConnectorSession session, HiveTableHandle han
if (table.getParameters().containsKey(AVRO_SCHEMA_URL_KEY) || table.getStorage().getSerdeParameters().containsKey(AVRO_SCHEMA_URL_KEY)) {
throw new TrinoException(NOT_SUPPORTED, "ALTER TABLE not supported when Avro schema url is set");
}
if (table.getParameters().containsKey(AVRO_SCHEMA_LITERAL_KEY) || table.getStorage().getSerdeParameters().containsKey(AVRO_SCHEMA_LITERAL_KEY)) {
throw new TrinoException(NOT_SUPPORTED, "ALTER TABLE not supported when Avro schema literal is set");
}
}

@Override
Expand Down Expand Up @@ -1310,6 +1343,10 @@ public HiveOutputTableHandle beginCreateTable(ConnectorSession session, Connecto
throw new TrinoException(NOT_SUPPORTED, "CREATE TABLE AS not supported when Avro schema url is set");
}

if (getAvroSchemaLiteral(tableMetadata.getProperties()) != null) {
throw new TrinoException(NOT_SUPPORTED, "CREATE TABLE AS not supported when Avro schema literal is set");
}

HiveStorageFormat tableStorageFormat = getHiveStorageFormat(tableMetadata.getProperties());
List<String> partitionedBy = getPartitionedBy(tableMetadata.getProperties());
Optional<HiveBucketProperty> bucketProperty = getBucketProperty(tableMetadata.getProperties());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class HiveTableProperties
public static final String ORC_BLOOM_FILTER_COLUMNS = "orc_bloom_filter_columns";
public static final String ORC_BLOOM_FILTER_FPP = "orc_bloom_filter_fpp";
public static final String AVRO_SCHEMA_URL = "avro_schema_url";
public static final String AVRO_SCHEMA_LITERAL = "avro_schema_literal";
public static final String TEXTFILE_FIELD_SEPARATOR = "textfile_field_separator";
public static final String TEXTFILE_FIELD_SEPARATOR_ESCAPE = "textfile_field_separator_escape";
public static final String NULL_FORMAT_PROPERTY = "null_format";
Expand Down Expand Up @@ -148,6 +149,7 @@ public HiveTableProperties(
integerProperty(BUCKETING_VERSION, "Bucketing version", null, false),
integerProperty(BUCKET_COUNT_PROPERTY, "Number of buckets", 0, false),
stringProperty(AVRO_SCHEMA_URL, "URI pointing to Avro schema for the table", null, false),
stringProperty(AVRO_SCHEMA_LITERAL, "JSON-encoded Avro schema for the table", null, false),
integerProperty(SKIP_HEADER_LINE_COUNT, "Number of header lines", null, false),
integerProperty(SKIP_FOOTER_LINE_COUNT, "Number of footer lines", null, false),
stringProperty(TEXTFILE_FIELD_SEPARATOR, "TEXTFILE field separator character", null, false),
Expand All @@ -174,6 +176,11 @@ public static String getAvroSchemaUrl(Map<String, Object> tableProperties)
return (String) tableProperties.get(AVRO_SCHEMA_URL);
}

public static String getAvroSchemaLiteral(Map<String, Object> tableProperties)
{
return (String) tableProperties.get(AVRO_SCHEMA_LITERAL);
}

public static Optional<Integer> getHeaderSkipCount(Map<String, Object> tableProperties)
{
return Optional.ofNullable((Integer) tableProperties.get(SKIP_HEADER_LINE_COUNT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import io.trino.spi.type.VarcharType;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.metastore.ProtectMode;
import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;

Expand All @@ -57,10 +58,9 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.base.Strings.nullToEmpty;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.hive.HiveMetadata.AVRO_SCHEMA_URL_KEY;
import static io.trino.plugin.hive.HiveSplitManager.PRESTO_OFFLINE;
import static io.trino.plugin.hive.HiveStorageFormat.AVRO;
import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.NUM_ROWS;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.predicate.TupleDomain.withColumnDomains;
Expand Down Expand Up @@ -95,6 +95,7 @@ public static Properties getHiveSchema(Table table)
// Mimics function in Hive: MetaStoreUtils.getTableMetadata(Table)
return getHiveSchema(
table.getStorage(),
Optional.empty(),
table.getDataColumns(),
table.getDataColumns(),
table.getParameters(),
Expand All @@ -108,6 +109,7 @@ public static Properties getHiveSchema(Partition partition, Table table)
// Mimics function in Hive: MetaStoreUtils.getSchema(Partition, Table)
return getHiveSchema(
partition.getStorage(),
Optional.of(table.getStorage()),
partition.getColumns(),
table.getDataColumns(),
table.getParameters(),
Expand All @@ -118,6 +120,7 @@ public static Properties getHiveSchema(Partition partition, Table table)

private static Properties getHiveSchema(
Storage sd,
Optional<Storage> tableSd,
List<Column> dataColumns,
List<Column> tableDataColumns,
Map<String, String> parameters,
Expand Down Expand Up @@ -147,6 +150,14 @@ private static Properties getHiveSchema(
for (Map.Entry<String, String> param : sd.getSerdeParameters().entrySet()) {
schema.setProperty(param.getKey(), (param.getValue() != null) ? param.getValue() : "");
}
// Sometimes Avro schema is stored in table-level SerDe parameters. And in such cases, table-level schema should
// override partition-level schema because we should pass the table-level schema to GenericHiveRecordCursor and let
// Avro reader handles schema evolution itself.
if (sd.getStorageFormat().getSerDe().equals(AvroSerDe.class.getName()) && tableSd.isPresent()) {
for (Map.Entry<String, String> param : tableSd.get().getSerdeParameters().entrySet()) {
schema.setProperty(param.getKey(), nullToEmpty(param.getValue()));
}
}
schema.setProperty(SERIALIZATION_LIB, sd.getStorageFormat().getSerDe());

StringBuilder columnNameBuilder = new StringBuilder();
Expand Down Expand Up @@ -213,13 +224,6 @@ public static ProtectMode getProtectMode(Table table)
return getProtectMode(table.getParameters());
}

public static boolean isAvroTableWithSchemaSet(Table table)
{
return AVRO.getSerDe().equals(table.getStorage().getStorageFormat().getSerDeNullable()) &&
(table.getParameters().get(AVRO_SCHEMA_URL_KEY) != null ||
(table.getStorage().getSerdeParameters().get(AVRO_SCHEMA_URL_KEY) != null));
}

public static String makePartitionName(Table table, Partition partition)
{
return makePartitionName(table.getPartitionColumns(), partition.getValues());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT;
import static io.trino.plugin.hive.metastore.MetastoreUtil.isAvroTableWithSchemaSet;
import static io.trino.plugin.hive.metastore.MetastoreUtil.verifyCanDropColumn;
import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.csvSchemaFields;
import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.fromMetastoreApiDatabase;
import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.fromMetastoreApiTable;
import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.isAvroTableWithSchemaSet;
import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.isCsvTable;
import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.isPartitionSerdesUsingMetastoreForSchema;
import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.isTableSerdesUsingMetastoreForSchema;
import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.toMetastoreApiDatabase;
import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.toMetastoreApiTable;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
Expand Down Expand Up @@ -94,12 +94,12 @@ public List<String> getAllDatabases()
public Optional<Table> getTable(HiveIdentity identity, String databaseName, String tableName)
{
return delegate.getTable(identity, databaseName, tableName).map(table -> {
if (isAvroTableWithSchemaSet(table)) {
return fromMetastoreApiTable(table, delegate.getFields(identity, databaseName, tableName).orElseThrow());
}
if (isCsvTable(table)) {
return fromMetastoreApiTable(table, csvSchemaFields(table.getSd().getCols()));
}
if (!isTableSerdesUsingMetastoreForSchema(table)) {
return fromMetastoreApiTable(table, delegate.getFields(identity, databaseName, tableName).orElseThrow());
}
return fromMetastoreApiTable(table);
});
}
Expand Down Expand Up @@ -381,7 +381,10 @@ public Map<String, Optional<Partition>> getPartitionsByNames(HiveIdentity identi

private Partition fromMetastoreApiPartition(Table table, org.apache.hadoop.hive.metastore.api.Partition partition)
{
if (isAvroTableWithSchemaSet(table)) {
// If a partition's Serde doesn't use metastore for schema (like Avro), we use table-level schema, instead of
// fetching the schema from the partition's Serde. Data publishers are responsible for making sure that table-level
// schema is workable for these partitions.
if (!isPartitionSerdesUsingMetastoreForSchema(partition)) {
List<FieldSchema> schema = table.getDataColumns().stream()
.map(ThriftMetastoreUtil::toMetastoreApiFieldSchema)
.collect(toImmutableList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@
import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.fromRolePrincipalGrants;
import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.fromTrinoPrincipalType;
import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.getHiveBasicStatistics;
import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.isAvroTableWithSchemaSet;
import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.isTableSerdesUsingMetastoreForSchema;
import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.parsePrivilege;
import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.toMetastoreApiPartition;
import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.updateStatisticsParameters;
Expand Down Expand Up @@ -535,8 +535,8 @@ public void updateTableStatistics(HiveIdentity identity, String databaseName, St
List<ColumnStatisticsObj> metastoreColumnStatistics = updatedStatistics.getColumnStatistics().entrySet().stream()
.flatMap(entry -> {
Optional<Column> column = table.getColumn(entry.getKey());
if (column.isEmpty() && isAvroTableWithSchemaSet(modifiedTable)) {
// Avro table can have different effective schema than declared in metastore. Still, metastore does not allow
if (column.isEmpty() && !isTableSerdesUsingMetastoreForSchema(modifiedTable)) {
// Some tables can have different effective schema than declared in metastore. Still, metastore does not allow
// to store statistics for a column it does not know about.
return Stream.of();
}
Expand Down
Loading