Skip to content

Commit

Permalink
Cleanup in RcFilePageSourceFactory
Browse files Browse the repository at this point in the history
Get type directly from HiveColumnHandle instead of generating them
from HiveType.
  • Loading branch information
Praveen2112 committed Jun 28, 2023
1 parent 97f6de6 commit e1a16eb
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import io.trino.plugin.hive.HiveColumnHandle;
import io.trino.plugin.hive.HiveConfig;
import io.trino.plugin.hive.HivePageSourceFactory;
import io.trino.plugin.hive.HiveTimestampPrecision;
import io.trino.plugin.hive.ReaderColumns;
import io.trino.plugin.hive.ReaderPageSource;
import io.trino.plugin.hive.acid.AcidTransaction;
Expand All @@ -46,7 +45,6 @@
import io.trino.spi.connector.EmptyPageSource;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import org.apache.hadoop.conf.Configuration;
import org.joda.time.DateTimeZone;

Expand All @@ -61,7 +59,6 @@
import static io.trino.plugin.hive.HiveErrorCode.HIVE_BAD_DATA;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT;
import static io.trino.plugin.hive.HivePageSourceProvider.projectBaseColumns;
import static io.trino.plugin.hive.HiveSessionProperties.getTimestampPrecision;
import static io.trino.plugin.hive.ReaderPageSource.noProjectionAdaptation;
import static io.trino.plugin.hive.util.HiveClassNames.COLUMNAR_SERDE_CLASS;
import static io.trino.plugin.hive.util.HiveClassNames.LAZY_BINARY_COLUMNAR_SERDE_CLASS;
Expand All @@ -76,15 +73,13 @@ public class RcFilePageSourceFactory
{
private static final DataSize BUFFER_SIZE = DataSize.of(8, Unit.MEGABYTE);

private final TypeManager typeManager;
private final TrinoFileSystemFactory fileSystemFactory;
private final FileFormatDataSourceStats stats;
private final DateTimeZone timeZone;

@Inject
public RcFilePageSourceFactory(TypeManager typeManager, TrinoFileSystemFactory fileSystemFactory, FileFormatDataSourceStats stats, HiveConfig hiveConfig)
public RcFilePageSourceFactory(TrinoFileSystemFactory fileSystemFactory, FileFormatDataSourceStats stats, HiveConfig hiveConfig)
{
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.stats = requireNonNull(stats, "stats is null");
this.timeZone = hiveConfig.getRcfileDateTimeZone();
Expand Down Expand Up @@ -167,9 +162,8 @@ else if (deserializerClassName.equals(COLUMNAR_SERDE_CLASS)) {

try {
ImmutableMap.Builder<Integer, Type> readColumns = ImmutableMap.builder();
HiveTimestampPrecision timestampPrecision = getTimestampPrecision(session);
for (HiveColumnHandle column : projectedReaderColumns) {
readColumns.put(column.getBaseHiveColumnIndex(), column.getHiveType().getType(typeManager, timestampPrecision));
readColumns.put(column.getBaseHiveColumnIndex(), column.getType());
}

RcFileReader rcFileReader = new RcFileReader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public static Set<HivePageSourceFactory> getDefaultHivePageSourceFactories(HdfsE
.add(new RegexPageSourceFactory(fileSystemFactory, stats, hiveConfig))
.add(new SimpleTextFilePageSourceFactory(fileSystemFactory, stats, hiveConfig))
.add(new SimpleSequenceFilePageSourceFactory(fileSystemFactory, stats, hiveConfig))
.add(new RcFilePageSourceFactory(TESTING_TYPE_MANAGER, fileSystemFactory, stats, hiveConfig))
.add(new RcFilePageSourceFactory(fileSystemFactory, stats, hiveConfig))
.add(new OrcPageSourceFactory(new OrcReaderConfig(), fileSystemFactory, stats, hiveConfig))
.add(new ParquetPageSourceFactory(fileSystemFactory, stats, new ParquetReaderConfig(), hiveConfig))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ public void testRcTextPageSource(int rowCount, long fileSizePadding)
.withColumns(TEST_COLUMNS)
.withRowsCount(rowCount)
.withFileSizePadding(fileSizePadding)
.isReadableByPageSource(new RcFilePageSourceFactory(TESTING_TYPE_MANAGER, FILE_SYSTEM_FACTORY, STATS, new HiveConfig()));
.isReadableByPageSource(new RcFilePageSourceFactory(FILE_SYSTEM_FACTORY, STATS, new HiveConfig()));
}

@Test(dataProvider = "rowCount")
Expand All @@ -300,7 +300,7 @@ public void testRcTextOptimizedWriter(int rowCount)
.withRowsCount(rowCount)
.withFileWriterFactory(new RcFileFileWriterFactory(FILE_SYSTEM_FACTORY, TESTING_TYPE_MANAGER, new NodeVersion("test"), HIVE_STORAGE_TIME_ZONE))
.isReadableByRecordCursor(createGenericHiveRecordCursorProvider(HDFS_ENVIRONMENT))
.isReadableByPageSource(new RcFilePageSourceFactory(TESTING_TYPE_MANAGER, FILE_SYSTEM_FACTORY, STATS, new HiveConfig()));
.isReadableByPageSource(new RcFilePageSourceFactory(FILE_SYSTEM_FACTORY, STATS, new HiveConfig()));
}

@Test(dataProvider = "rowCount")
Expand All @@ -317,7 +317,7 @@ public void testRcBinaryPageSource(int rowCount)
assertThatFileFormat(RCBINARY)
.withColumns(testColumns)
.withRowsCount(rowCount)
.isReadableByPageSource(new RcFilePageSourceFactory(TESTING_TYPE_MANAGER, FILE_SYSTEM_FACTORY, STATS, new HiveConfig()));
.isReadableByPageSource(new RcFilePageSourceFactory(FILE_SYSTEM_FACTORY, STATS, new HiveConfig()));
}

@Test(dataProvider = "rowCount")
Expand All @@ -342,7 +342,7 @@ public void testRcBinaryOptimizedWriter(int rowCount)
// generic Hive writer corrupts timestamps
.withSkipGenericWriterTest()
.withFileWriterFactory(new RcFileFileWriterFactory(FILE_SYSTEM_FACTORY, TESTING_TYPE_MANAGER, new NodeVersion("test"), HIVE_STORAGE_TIME_ZONE))
.isReadableByPageSource(new RcFilePageSourceFactory(TESTING_TYPE_MANAGER, FILE_SYSTEM_FACTORY, STATS, new HiveConfig()))
.isReadableByPageSource(new RcFilePageSourceFactory(FILE_SYSTEM_FACTORY, STATS, new HiveConfig()))
.withColumns(testColumnsNoTimestamps)
.isReadableByRecordCursor(createGenericHiveRecordCursorProvider(HDFS_ENVIRONMENT));
}
Expand Down Expand Up @@ -574,13 +574,13 @@ public void testTruncateVarcharColumn()
assertThatFileFormat(RCTEXT)
.withWriteColumns(ImmutableList.of(writeColumn))
.withReadColumns(ImmutableList.of(readColumn))
.isReadableByPageSource(new RcFilePageSourceFactory(TESTING_TYPE_MANAGER, FILE_SYSTEM_FACTORY, STATS, new HiveConfig()))
.isReadableByPageSource(new RcFilePageSourceFactory(FILE_SYSTEM_FACTORY, STATS, new HiveConfig()))
.isReadableByRecordCursor(createGenericHiveRecordCursorProvider(HDFS_ENVIRONMENT));

assertThatFileFormat(RCBINARY)
.withWriteColumns(ImmutableList.of(writeColumn))
.withReadColumns(ImmutableList.of(readColumn))
.isReadableByPageSource(new RcFilePageSourceFactory(TESTING_TYPE_MANAGER, FILE_SYSTEM_FACTORY, STATS, new HiveConfig()))
.isReadableByPageSource(new RcFilePageSourceFactory(FILE_SYSTEM_FACTORY, STATS, new HiveConfig()))
.isReadableByRecordCursor(createGenericHiveRecordCursorProvider(HDFS_ENVIRONMENT));

assertThatFileFormat(ORC)
Expand Down Expand Up @@ -809,7 +809,7 @@ public void testRCTextProjectedColumnsPageSource(int rowCount)
.withWriteColumns(writeColumns)
.withReadColumns(readColumns)
.withRowsCount(rowCount)
.isReadableByPageSource(new RcFilePageSourceFactory(TESTING_TYPE_MANAGER, FILE_SYSTEM_FACTORY, STATS, new HiveConfig()));
.isReadableByPageSource(new RcFilePageSourceFactory(FILE_SYSTEM_FACTORY, STATS, new HiveConfig()));
}

@Test(dataProvider = "rowCount")
Expand Down Expand Up @@ -842,7 +842,7 @@ public void testRCBinaryProjectedColumns(int rowCount)
// generic Hive writer corrupts timestamps
.withSkipGenericWriterTest()
.withFileWriterFactory(new RcFileFileWriterFactory(FILE_SYSTEM_FACTORY, TESTING_TYPE_MANAGER, new NodeVersion("test"), HIVE_STORAGE_TIME_ZONE))
.isReadableByPageSource(new RcFilePageSourceFactory(TESTING_TYPE_MANAGER, FILE_SYSTEM_FACTORY, STATS, new HiveConfig()));
.isReadableByPageSource(new RcFilePageSourceFactory(FILE_SYSTEM_FACTORY, STATS, new HiveConfig()));
}

@Test(dataProvider = "rowCount")
Expand Down Expand Up @@ -872,7 +872,7 @@ public void testRCBinaryProjectedColumnsPageSource(int rowCount)
// generic Hive writer corrupts timestamps
.withSkipGenericWriterTest()
.withFileWriterFactory(new RcFileFileWriterFactory(FILE_SYSTEM_FACTORY, TESTING_TYPE_MANAGER, new NodeVersion("test"), HIVE_STORAGE_TIME_ZONE))
.isReadableByPageSource(new RcFilePageSourceFactory(TESTING_TYPE_MANAGER, FILE_SYSTEM_FACTORY, STATS, new HiveConfig()));
.isReadableByPageSource(new RcFilePageSourceFactory(FILE_SYSTEM_FACTORY, STATS, new HiveConfig()));
}

@Test
Expand All @@ -889,12 +889,12 @@ public void testFailForLongVarcharPartitionColumn()

assertThatFileFormat(RCTEXT)
.withColumns(columns)
.isFailingForPageSource(new RcFilePageSourceFactory(TESTING_TYPE_MANAGER, FILE_SYSTEM_FACTORY, STATS, new HiveConfig()), expectedErrorCode, expectedMessage)
.isFailingForPageSource(new RcFilePageSourceFactory(FILE_SYSTEM_FACTORY, STATS, new HiveConfig()), expectedErrorCode, expectedMessage)
.isFailingForRecordCursor(createGenericHiveRecordCursorProvider(HDFS_ENVIRONMENT), expectedErrorCode, expectedMessage);

assertThatFileFormat(RCBINARY)
.withColumns(columns)
.isFailingForPageSource(new RcFilePageSourceFactory(TESTING_TYPE_MANAGER, FILE_SYSTEM_FACTORY, STATS, new HiveConfig()), expectedErrorCode, expectedMessage)
.isFailingForPageSource(new RcFilePageSourceFactory(FILE_SYSTEM_FACTORY, STATS, new HiveConfig()), expectedErrorCode, expectedMessage)
.isFailingForRecordCursor(createGenericHiveRecordCursorProvider(HDFS_ENVIRONMENT), expectedErrorCode, expectedMessage);

assertThatFileFormat(ORC)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import static io.trino.parquet.writer.ParquetSchemaConverter.HIVE_PARQUET_USE_LEGACY_DECIMAL_ENCODING;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_STATS;
import static io.trino.type.InternalTypeManager.TESTING_TYPE_MANAGER;
import static org.joda.time.DateTimeZone.UTC;

public final class StandardFileFormats
Expand All @@ -73,7 +72,7 @@ public HiveStorageFormat getFormat()
@Override
public Optional<HivePageSourceFactory> getHivePageSourceFactory(HdfsEnvironment hdfsEnvironment)
{
return Optional.of(new RcFilePageSourceFactory(TESTING_TYPE_MANAGER, HDFS_FILE_SYSTEM_FACTORY, new FileFormatDataSourceStats(), new HiveConfig().setRcfileTimeZone("UTC")));
return Optional.of(new RcFilePageSourceFactory(HDFS_FILE_SYSTEM_FACTORY, new FileFormatDataSourceStats(), new HiveConfig().setRcfileTimeZone("UTC")));
}

@Override
Expand Down Expand Up @@ -104,7 +103,7 @@ public HiveStorageFormat getFormat()
@Override
public Optional<HivePageSourceFactory> getHivePageSourceFactory(HdfsEnvironment hdfsEnvironment)
{
return Optional.of(new RcFilePageSourceFactory(TESTING_TYPE_MANAGER, HDFS_FILE_SYSTEM_FACTORY, new FileFormatDataSourceStats(), new HiveConfig().setRcfileTimeZone("UTC")));
return Optional.of(new RcFilePageSourceFactory(HDFS_FILE_SYSTEM_FACTORY, new FileFormatDataSourceStats(), new HiveConfig().setRcfileTimeZone("UTC")));
}

@Override
Expand Down

0 comments on commit e1a16eb

Please sign in to comment.