From 364dc922ddcf249e5f782cc2329cf8c534febec6 Mon Sep 17 00:00:00 2001 From: Rob Marrowstone Date: Tue, 20 Aug 2024 15:27:36 -0700 Subject: [PATCH] Revert "WIP Avro and Test Changes" This reverts commit b96dc9e3445ff29cf2b1fb6e3fe3b71031e394b7. --- .../hive/formats/avro/AvroFileReader.java | 18 +-- .../hive/formats/avro/AvroPageDataReader.java | 10 +- .../plugin/hive/avro/AvroPageSource.java | 13 -- .../hive/avro/AvroPageSourceFactory.java | 4 +- .../plugin/hive/BaseHiveConnectorTest.java | 2 +- .../plugin/hive/TestHiveFileFormats.java | 2 +- .../trino/plugin/hive/TestNestedPruning.java | 126 +----------------- 7 files changed, 14 insertions(+), 161 deletions(-) diff --git a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/avro/AvroFileReader.java b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/avro/AvroFileReader.java index fd3e943c6053..406cfd67a337 100644 --- a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/avro/AvroFileReader.java +++ b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/avro/AvroFileReader.java @@ -57,23 +57,9 @@ public AvroFileReader( long offset, OptionalLong length) throws IOException, AvroTypeException - { - this(inputFile, schema, schema, avroTypeBlockHandler, offset, length); - } - - public AvroFileReader( - TrinoInputFile inputFile, - Schema writerSchema, - Schema readerSchema, - AvroTypeBlockHandler avroTypeBlockHandler, - long offset, - OptionalLong length) - throws IOException, AvroTypeException { requireNonNull(inputFile, "inputFile is null"); - requireNonNull(readerSchema, "reader schema is null"); - requireNonNull(writerSchema, "writer schema is null"); - + requireNonNull(schema, "schema is null"); requireNonNull(avroTypeBlockHandler, "avroTypeBlockHandler is null"); long fileSize = inputFile.length(); @@ -83,7 +69,7 @@ public AvroFileReader( end = length.stream().map(l -> l + offset).findFirst(); end.ifPresent(endLong -> verify(endLong <= fileSize, "offset plus length is greater than data size")); input = new TrinoDataInputStream(inputFile.newStream()); - dataReader = new AvroPageDataReader(writerSchema, readerSchema, avroTypeBlockHandler); + dataReader = new AvroPageDataReader(schema, avroTypeBlockHandler); try { fileReader = new DataFileReader<>(new TrinoDataInputStreamAsAvroSeekableInput(input, fileSize), dataReader); fileReader.sync(offset); diff --git a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/avro/AvroPageDataReader.java b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/avro/AvroPageDataReader.java index 29bde34409eb..418f6e1505b1 100644 --- a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/avro/AvroPageDataReader.java +++ b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/avro/AvroPageDataReader.java @@ -37,17 +37,11 @@ public class AvroPageDataReader private RowBlockBuildingDecoder rowBlockBuildingDecoder; private final AvroTypeBlockHandler typeManager; - public AvroPageDataReader(Schema schema, AvroTypeBlockHandler typeManager) - throws AvroTypeException - { - this(schema, schema, typeManager); - } - - public AvroPageDataReader(Schema writerSchema, Schema readerSchema, AvroTypeBlockHandler typeManager) + public AvroPageDataReader(Schema readerSchema, AvroTypeBlockHandler typeManager) throws AvroTypeException { this.readerSchema = requireNonNull(readerSchema, "readerSchema is null"); - this.writerSchema = requireNonNull(writerSchema, "writerSchema is null"); + writerSchema = this.readerSchema; this.typeManager = requireNonNull(typeManager, "typeManager is null"); verifyNoCircularReferences(readerSchema); try { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/avro/AvroPageSource.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/avro/AvroPageSource.java index 9f1b999d5b86..54503fc61779 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/avro/AvroPageSource.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/avro/AvroPageSource.java @@ -50,19 +50,6 @@ public AvroPageSource( avroFileReader = new AvroFileReader(inputFile, schema, avroTypeManager, offset, OptionalLong.of(length)); } - public AvroPageSource( - TrinoInputFile inputFile, - Schema writerSchema, - Schema readerSchema, - AvroTypeBlockHandler avroTypeManager, - long offset, - long length) - throws IOException, AvroTypeException - { - fileName = requireNonNull(inputFile, "inputFile is null").location().fileName(); - avroFileReader = new AvroFileReader(inputFile, writerSchema, readerSchema, avroTypeManager, offset, OptionalLong.of(length)); - } - @Override public long getCompletedBytes() { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/avro/AvroPageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/avro/AvroPageSourceFactory.java index de158128f48f..525654d81ef4 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/avro/AvroPageSourceFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/avro/AvroPageSourceFactory.java @@ -175,9 +175,7 @@ public Optional createPageSource( } try { - return Optional.of( - new ReaderPageSource( - new AvroPageSource(inputFile, maskedSchema, new HiveAvroTypeBlockHandler(createTimestampType(hiveTimestampPrecision.getPrecision())), start, length), readerProjections)); + return Optional.of(new ReaderPageSource(new AvroPageSource(inputFile, maskedSchema, new HiveAvroTypeBlockHandler(createTimestampType(hiveTimestampPrecision.getPrecision())), start, length), readerProjections)); } catch (IOException e) { throw new TrinoException(HIVE_CANNOT_OPEN_SPLIT, e); diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java index 00c01499597c..c3f44cd4ae14 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java @@ -5733,7 +5733,7 @@ private String testReadWithPartitionSchemaMismatchAddedColumns(Session session, public void testSubfieldReordering() { // Validate for formats for which subfield access is name based - List formats = ImmutableList.of(HiveStorageFormat.AVRO); + List formats = ImmutableList.of(HiveStorageFormat.ORC, HiveStorageFormat.PARQUET, HiveStorageFormat.AVRO); String tableName = "evolve_test_" + randomNameSuffix(); for (HiveStorageFormat format : formats) { diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFileFormats.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFileFormats.java index 492fc3732ca5..da439242ccfc 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFileFormats.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFileFormats.java @@ -1386,7 +1386,7 @@ private static void createTestFileTrino( hiveFileWriter.commit(); } - static void writeValue(Type type, BlockBuilder builder, Object object) + private static void writeValue(Type type, BlockBuilder builder, Object object) { requireNonNull(builder, "builder is null"); diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestNestedPruning.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestNestedPruning.java index 099b7aab24d5..6ff69c37ae25 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestNestedPruning.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestNestedPruning.java @@ -22,21 +22,13 @@ import io.trino.filesystem.TrinoOutputFile; import io.trino.filesystem.memory.MemoryFileSystemFactory; import io.trino.metastore.HiveType; -import io.trino.metastore.StorageFormat; -import io.trino.plugin.hive.avro.AvroFileWriterFactory; -import io.trino.plugin.hive.avro.AvroPageSourceFactory; -import io.trino.plugin.hive.line.OpenXJsonFileWriterFactory; import io.trino.plugin.hive.line.OpenXJsonPageSourceFactory; -import io.trino.plugin.hive.util.HiveTypeTranslator; -import io.trino.spi.Page; -import io.trino.spi.PageBuilder; import io.trino.spi.connector.ConnectorPageSource; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.type.BooleanType; import io.trino.spi.type.IntegerType; import io.trino.spi.type.RowType; -import io.trino.spi.type.Type; import io.trino.spi.type.VarcharType; import io.trino.testing.MaterializedResult; import org.junit.jupiter.api.Assertions; @@ -54,19 +46,16 @@ import static io.trino.hive.thrift.metastore.hive_metastoreConstants.FILE_INPUT_FORMAT; import static io.trino.plugin.hive.HivePageSourceProvider.ColumnMapping.buildColumnMappings; -import static io.trino.plugin.hive.HiveStorageFormat.AVRO; import static io.trino.plugin.hive.HiveStorageFormat.OPENX_JSON; import static io.trino.plugin.hive.HiveTestUtils.getHiveSession; import static io.trino.plugin.hive.HiveTestUtils.projectedColumn; import static io.trino.plugin.hive.HiveTestUtils.toHiveBaseColumnHandle; -import static io.trino.plugin.hive.TestHiveFileFormats.writeValue; import static io.trino.plugin.hive.acid.AcidTransaction.NO_ACID_TRANSACTION; import static io.trino.plugin.hive.util.SerdeConstants.LIST_COLUMNS; import static io.trino.plugin.hive.util.SerdeConstants.LIST_COLUMN_TYPES; import static io.trino.plugin.hive.util.SerdeConstants.SERIALIZATION_LIB; import static io.trino.spi.type.RowType.field; import static io.trino.type.InternalTypeManager.TESTING_TYPE_MANAGER; -import static java.util.stream.Collectors.toList; /** * This test proves that non-dereferenced fields are pruned from nested RowTypes. @@ -205,72 +194,23 @@ public void testProjectionsFromDifferentPartsOfSameBase() List.of(false, 31)); } - @Test - public void testWriteThenRead() - throws IOException - { - HiveColumnHandle someOtherColumn = toHiveBaseColumnHandle("something_else", VarcharType.VARCHAR, 1); - List writeColumns = List.of(tableColumns.get(0), someOtherColumn); - - assertRoundTrip( - writeColumns, - List.of( - List.of(List.of(true, "bar", 31), "spam")), - writeColumns, - List.of( - someOtherColumn, - projectedColumn(tableColumns.get(0), "basic_int"), - projectedColumn(tableColumns.get(0), "basic_bool")), - List.of("spam", 31, true)); - } - private void assertValues(List projectedColumns, String text, List expected) throws IOException { TrinoFileSystemFactory fileSystemFactory = new MemoryFileSystemFactory(); - Location location = Location.of("memory:///test"); + Location location = Location.of("memory:///test.ion"); final ConnectorSession session = getHiveSession(new HiveConfig()); writeTextFile(text, location, fileSystemFactory.create(session)); - HivePageSourceFactory pageSourceFactory = new OpenXJsonPageSourceFactory(fileSystemFactory, new HiveConfig()); - try (ConnectorPageSource pageSource = createPageSource(pageSourceFactory, OPENX_JSON, fileSystemFactory, location, tableColumns, projectedColumns, session)) { + try (ConnectorPageSource pageSource = createPageSource(fileSystemFactory, location, tableColumns, projectedColumns, session)) { final MaterializedResult result = MaterializedResult.materializeSourceDataStream(session, pageSource, projectedColumns.stream().map(HiveColumnHandle::getType).toList()); Assertions.assertEquals(1, result.getRowCount()); Assertions.assertEquals(expected, result.getMaterializedRows().getFirst().getFields()); } } - private void assertRoundTrip( - List writeColumns, - List writeValues, - List readColumns, - List projections, - List expected) - throws IOException - { - TrinoFileSystemFactory fileSystemFactory = new MemoryFileSystemFactory(); - Location location = Location.of("memory:///test"); - - final ConnectorSession session = getHiveSession(new HiveConfig()); - - writeObjectsToFile( - new AvroFileWriterFactory(fileSystemFactory, TESTING_TYPE_MANAGER, new NodeVersion("test_version")), - AVRO, - writeValues, - writeColumns, - location, - session); - - HivePageSourceFactory pageSourceFactory = new AvroPageSourceFactory(fileSystemFactory); - try (ConnectorPageSource pageSource = createPageSource(pageSourceFactory, AVRO, fileSystemFactory, location, readColumns, projections, session)) { - final MaterializedResult result = MaterializedResult.materializeSourceDataStream(session, pageSource, projections.stream().map(HiveColumnHandle::getType).toList()); - Assertions.assertEquals(1, result.getRowCount()); - Assertions.assertEquals(expected, result.getMaterializedRows().getFirst().getFields()); - } - } - private int writeTextFile(String text, Location location, TrinoFileSystem fileSystem) throws IOException { @@ -285,64 +225,10 @@ private int writeTextFile(String text, Location location, TrinoFileSystem fileSy return written; } - private void writeObjectsToFile( - HiveFileWriterFactory fileWriterFactory, - HiveStorageFormat storageFormat, - List objects, - List columns, - Location location, - ConnectorSession session) { - - columns = columns.stream() - .filter(c -> c.getColumnType().equals(HiveColumnHandle.ColumnType.REGULAR)) - .toList(); - List types = columns.stream() - .map(HiveColumnHandle::getType) - .collect(toList()); - - PageBuilder pageBuilder = new PageBuilder(types); - for (Object row : objects) { - pageBuilder.declarePosition(); - for (int col = 0; col < columns.size(); col++) { - Type type = types.get(col); - Object value = ((List)row).get(col); - - writeValue(type, pageBuilder.getBlockBuilder(col), value); - } - } - Page page = pageBuilder.build(); - - Map tableProperties = ImmutableMap.builder() - .put(LIST_COLUMNS, columns.stream().map(HiveColumnHandle::getName).collect(Collectors.joining(","))) - .put(LIST_COLUMN_TYPES, columns.stream().map(HiveColumnHandle::getType).map(HiveTypeTranslator::toHiveType).map(HiveType::toString).collect(Collectors.joining(","))) - .buildOrThrow(); - - - Optional fileWriter = fileWriterFactory.createFileWriter( - location, - columns.stream() - .map(HiveColumnHandle::getName) - .toList(), - storageFormat.toStorageFormat(), - HiveCompressionCodec.NONE, - tableProperties, - session, - OptionalInt.empty(), - NO_ACID_TRANSACTION, - false, - WriterKind.INSERT); - - FileWriter hiveFileWriter = fileWriter.orElseThrow(() -> new IllegalArgumentException("fileWriterFactory")); - hiveFileWriter.appendRows(page); - hiveFileWriter.commit(); - } - /** * todo: this is very similar to what's in TestOrcPredicates, factor out. */ private static ConnectorPageSource createPageSource( - HivePageSourceFactory pageSourceFactory, - HiveStorageFormat storageFormat, TrinoFileSystemFactory fileSystemFactory, Location location, List tableColumns, @@ -350,6 +236,8 @@ private static ConnectorPageSource createPageSource( ConnectorSession session) throws IOException { + OpenXJsonPageSourceFactory factory = new OpenXJsonPageSourceFactory(fileSystemFactory, new HiveConfig()); + long length = fileSystemFactory.create(session).newInputFile(location).length(); List columnMappings = buildColumnMappings( @@ -364,14 +252,14 @@ private static ConnectorPageSource createPageSource( Instant.now().toEpochMilli()); final Map tableProperties = ImmutableMap.builder() - .put(FILE_INPUT_FORMAT, storageFormat.getInputFormat()) - .put(SERIALIZATION_LIB, storageFormat.getSerde()) + .put(FILE_INPUT_FORMAT, OPENX_JSON.getInputFormat()) + .put(SERIALIZATION_LIB, OPENX_JSON.getSerde()) .put(LIST_COLUMNS, tableColumns.stream().map(HiveColumnHandle::getName).collect(Collectors.joining(","))) .put(LIST_COLUMN_TYPES, tableColumns.stream().map(HiveColumnHandle::getHiveType).map(HiveType::toString).collect(Collectors.joining(","))) .buildOrThrow(); return HivePageSourceProvider.createHivePageSource( - ImmutableSet.of(pageSourceFactory), + ImmutableSet.of(factory), session, location, OptionalInt.empty(),