Skip to content

Commit

Permalink
Extract columnMappings argument
Browse files Browse the repository at this point in the history
  • Loading branch information
sopel39 committed Nov 5, 2021
1 parent 0fc4494 commit 4978cd6
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,17 @@ public ConnectorPageSource createPageSource(
Path path = new Path(hiveSplit.getPath());
boolean originalFile = ORIGINAL_FILE_PATH_MATCHER.matcher(path.toString()).matches();

List<ColumnMapping> columnMappings = ColumnMapping.buildColumnMappings(
hiveSplit.getPartitionName(),
hiveSplit.getPartitionKeys(),
hiveColumns,
hiveSplit.getBucketConversion().map(BucketConversion::getBucketColumnHandles).orElse(ImmutableList.of()),
hiveSplit.getTableToPartitionMapping(),
path,
hiveSplit.getBucketNumber(),
hiveSplit.getEstimatedFileSize(),
hiveSplit.getFileModifiedTime());

Configuration configuration = hdfsEnvironment.getConfiguration(new HdfsContext(session), path);

TupleDomain<HiveColumnHandle> simplifiedDynamicFilter = dynamicFilter
Expand All @@ -183,20 +194,17 @@ public ConnectorPageSource createPageSource(
hiveSplit.getStart(),
hiveSplit.getLength(),
hiveSplit.getEstimatedFileSize(),
hiveSplit.getFileModifiedTime(),
hiveSplit.getSchema(),
hiveTable.getCompactEffectivePredicate().intersect(simplifiedDynamicFilter),
hiveColumns,
hiveSplit.getPartitionName(),
hiveSplit.getPartitionKeys(),
typeManager,
hiveSplit.getTableToPartitionMapping(),
hiveSplit.getBucketConversion(),
hiveSplit.getBucketValidation(),
hiveSplit.isS3SelectPushdownEnabled(),
hiveSplit.getAcidInfo(),
originalFile,
hiveTable.getTransaction());
hiveTable.getTransaction(),
columnMappings);

if (pageSource.isPresent()) {
ConnectorPageSource source = pageSource.get();
Expand Down Expand Up @@ -247,35 +255,22 @@ public static Optional<ConnectorPageSource> createHivePageSource(
long start,
long length,
long estimatedFileSize,
long fileModifiedTime,
Properties schema,
TupleDomain<HiveColumnHandle> effectivePredicate,
List<HiveColumnHandle> columns,
String partitionName,
List<HivePartitionKey> partitionKeys,
TypeManager typeManager,
TableToPartitionMapping tableToPartitionMapping,
Optional<BucketConversion> bucketConversion,
Optional<BucketValidation> bucketValidation,
boolean s3SelectPushdownEnabled,
Optional<AcidInfo> acidInfo,
boolean originalFile,
AcidTransaction transaction)
AcidTransaction transaction,
List<ColumnMapping> columnMappings)
{
if (effectivePredicate.isNone()) {
return Optional.of(new EmptyPageSource());
}

List<ColumnMapping> columnMappings = ColumnMapping.buildColumnMappings(
partitionName,
partitionKeys,
columns,
bucketConversion.map(BucketConversion::getBucketColumnHandles).orElse(ImmutableList.of()),
tableToPartitionMapping,
path,
bucketNumber,
estimatedFileSize,
fileModifiedTime);
List<ColumnMapping> regularAndInterimColumnMappings = ColumnMapping.extractRegularAndInterimColumnMappings(columnMappings);

Optional<BucketAdaptation> bucketAdaptation = createBucketAdaptation(bucketConversion, bucketNumber, regularAndInterimColumnMappings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.airlift.slice.Slices.utf8Slice;
import static io.trino.plugin.hive.HivePageSourceProvider.ColumnMapping.buildColumnMappings;
import static io.trino.plugin.hive.HiveStorageFormat.AVRO;
import static io.trino.plugin.hive.HiveStorageFormat.CSV;
import static io.trino.plugin.hive.HiveStorageFormat.JSON;
Expand Down Expand Up @@ -922,6 +923,19 @@ private ConnectorPageSource createPageSourceFromCursorProvider(

Configuration configuration = new Configuration(false);
configuration.set("io.compression.codecs", LzoCodec.class.getName() + "," + LzopCodec.class.getName());

List<HiveColumnHandle> columnHandles = getColumnHandles(testReadColumns);
List<HivePageSourceProvider.ColumnMapping> columnMappings = buildColumnMappings(
partitionName,
partitionKeys,
columnHandles,
ImmutableList.of(),
TableToPartitionMapping.empty(),
split.getPath(),
OptionalInt.empty(),
fileSize,
Instant.now().toEpochMilli());

Optional<ConnectorPageSource> pageSource = HivePageSourceProvider.createHivePageSource(
ImmutableSet.of(),
ImmutableSet.of(cursorProvider),
Expand All @@ -932,20 +946,17 @@ private ConnectorPageSource createPageSourceFromCursorProvider(
split.getStart(),
split.getLength(),
fileSize,
Instant.now().toEpochMilli(),
splitProperties,
TupleDomain.all(),
getColumnHandles(testReadColumns),
partitionName,
partitionKeys,
columnHandles,
TYPE_MANAGER,
TableToPartitionMapping.empty(),
Optional.empty(),
Optional.empty(),
false,
Optional.empty(),
false,
NO_ACID_TRANSACTION);
NO_ACID_TRANSACTION,
columnMappings);

return pageSource.get();
}
Expand Down Expand Up @@ -992,6 +1003,17 @@ private void testPageSourceFactory(

List<HiveColumnHandle> columnHandles = getColumnHandles(testReadColumns);

List<HivePageSourceProvider.ColumnMapping> columnMappings = buildColumnMappings(
partitionName,
partitionKeys,
columnHandles,
ImmutableList.of(),
TableToPartitionMapping.empty(),
split.getPath(),
OptionalInt.empty(),
fileSize,
Instant.now().toEpochMilli());

Optional<ConnectorPageSource> pageSource = HivePageSourceProvider.createHivePageSource(
ImmutableSet.of(sourceFactory),
ImmutableSet.of(),
Expand All @@ -1002,20 +1024,17 @@ private void testPageSourceFactory(
split.getStart(),
split.getLength(),
fileSize,
Instant.now().toEpochMilli(),
splitProperties,
TupleDomain.all(),
columnHandles,
partitionName,
partitionKeys,
TYPE_MANAGER,
TableToPartitionMapping.empty(),
Optional.empty(),
Optional.empty(),
false,
Optional.empty(),
false,
NO_ACID_TRANSACTION);
NO_ACID_TRANSACTION,
columnMappings);

assertTrue(pageSource.isPresent());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
import static io.trino.plugin.hive.HiveColumnHandle.ColumnType.PARTITION_KEY;
import static io.trino.plugin.hive.HiveColumnHandle.ColumnType.REGULAR;
import static io.trino.plugin.hive.HiveColumnHandle.createBaseColumn;
import static io.trino.plugin.hive.HivePageSourceProvider.ColumnMapping.buildColumnMappings;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT;
import static io.trino.plugin.hive.HiveTestUtils.SESSION;
import static io.trino.plugin.hive.HiveTestUtils.TYPE_MANAGER;
Expand Down Expand Up @@ -551,6 +552,18 @@ public ConnectorPageSource newPageSource()
public ConnectorPageSource newPageSource(FileFormatDataSourceStats stats, ConnectorSession session)
{
OrcPageSourceFactory orcPageSourceFactory = new OrcPageSourceFactory(new OrcReaderOptions(), HDFS_ENVIRONMENT, stats, UTC);

List<HivePageSourceProvider.ColumnMapping> columnMappings = buildColumnMappings(
partitionName,
partitionKeys,
columns,
ImmutableList.of(),
TableToPartitionMapping.empty(),
fileSplit.getPath(),
OptionalInt.empty(),
fileSplit.getLength(),
Instant.now().toEpochMilli());

return HivePageSourceProvider.createHivePageSource(
ImmutableSet.of(orcPageSourceFactory),
ImmutableSet.of(),
Expand All @@ -561,21 +574,17 @@ public ConnectorPageSource newPageSource(FileFormatDataSourceStats stats, Connec
fileSplit.getStart(),
fileSplit.getLength(),
fileSplit.getLength(),
Instant.now().toEpochMilli(),
schema,
TupleDomain.all(),
columns,
partitionName,
partitionKeys,
TYPE_MANAGER,
TableToPartitionMapping.empty(),
Optional.empty(),
Optional.empty(),
false,
Optional.empty(),
false,
NO_ACID_TRANSACTION)
.orElseThrow();
NO_ACID_TRANSACTION,
columnMappings).orElseThrow();
}

public SourceOperator newTableScanOperator(DriverContext driverContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.hive.HivePageSourceProvider.ColumnMapping.buildColumnMappings;
import static io.trino.plugin.hive.HiveStorageFormat.ORC;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT;
import static io.trino.plugin.hive.HiveTestUtils.TYPE_MANAGER;
Expand Down Expand Up @@ -204,6 +205,17 @@ private ConnectorPageSource createPageSource(
return handle.get();
});

List<HivePageSourceProvider.ColumnMapping> columnMappings = buildColumnMappings(
partitionName,
partitionKeys,
columnHandles,
ImmutableList.of(),
TableToPartitionMapping.empty(),
split.getPath(),
OptionalInt.empty(),
split.getLength(),
Instant.now().toEpochMilli());

Optional<ConnectorPageSource> pageSource = HivePageSourceProvider.createHivePageSource(
ImmutableSet.of(readerFactory),
ImmutableSet.of(),
Expand All @@ -214,20 +226,17 @@ private ConnectorPageSource createPageSource(
split.getStart(),
split.getLength(),
split.getLength(),
Instant.now().toEpochMilli(),
splitProperties,
predicate,
columnHandles,
partitionName,
partitionKeys,
TYPE_MANAGER,
TableToPartitionMapping.empty(),
Optional.empty(),
Optional.empty(),
false,
Optional.empty(),
false,
NO_ACID_TRANSACTION);
NO_ACID_TRANSACTION,
columnMappings);

assertTrue(pageSource.isPresent());
return pageSource.get();
Expand Down

0 comments on commit 4978cd6

Please sign in to comment.