Skip to content

Commit

Permalink
Add parquet_use_column_names session property
Browse files Browse the repository at this point in the history
  • Loading branch information
Yaliang authored and nezihyigitbasi committed Jul 9, 2018
1 parent 609b2f6 commit 8e47c5b
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public final class HiveSessionProperties
private static final String RESPECT_TABLE_FORMAT = "respect_table_format";
private static final String PARQUET_PREDICATE_PUSHDOWN_ENABLED = "parquet_predicate_pushdown_enabled";
private static final String PARQUET_OPTIMIZED_READER_ENABLED = "parquet_optimized_reader_enabled";
private static final String PARQUET_USE_COLUMN_NAME = "parquet_use_column_names";
private static final String MAX_SPLIT_SIZE = "max_split_size";
private static final String MAX_INITIAL_SPLIT_SIZE = "max_initial_split_size";
public static final String RCFILE_OPTIMIZED_WRITER_ENABLED = "rcfile_optimized_writer_enabled";
Expand Down Expand Up @@ -212,6 +213,11 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon
"Experimental: Parquet: Enable predicate pushdown for Parquet",
hiveClientConfig.isParquetPredicatePushdownEnabled(),
false),
booleanSessionProperty(
PARQUET_USE_COLUMN_NAME,
"Experimental: Parquet: Access Parquet columns using names from the file",
hiveClientConfig.isUseParquetColumnNames(),
false),
dataSizeSessionProperty(
MAX_SPLIT_SIZE,
"Max split size",
Expand Down Expand Up @@ -379,6 +385,11 @@ public static boolean isParquetPredicatePushdownEnabled(ConnectorSession session
return session.getProperty(PARQUET_PREDICATE_PUSHDOWN_ENABLED, Boolean.class);
}

public static boolean isUseParquetColumnNames(ConnectorSession session)
{
return session.getProperty(PARQUET_USE_COLUMN_NAME, Boolean.class);
}

public static DataSize getMaxSplitSize(ConnectorSession session)
{
return session.getProperty(MAX_SPLIT_SIZE, DataSize.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import com.facebook.presto.hive.FileFormatDataSourceStats;
import com.facebook.presto.hive.HdfsEnvironment;
import com.facebook.presto.hive.HiveClientConfig;
import com.facebook.presto.hive.HiveColumnHandle;
import com.facebook.presto.hive.HivePageSourceFactory;
import com.facebook.presto.hive.parquet.predicate.ParquetPredicate;
Expand Down Expand Up @@ -57,6 +56,7 @@
import static com.facebook.presto.hive.HiveErrorCode.HIVE_MISSING_DATA;
import static com.facebook.presto.hive.HiveSessionProperties.isParquetOptimizedReaderEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isParquetPredicatePushdownEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isUseParquetColumnNames;
import static com.facebook.presto.hive.HiveUtil.getDeserializerClassName;
import static com.facebook.presto.hive.parquet.HdfsParquetDataSource.buildHdfsParquetDataSource;
import static com.facebook.presto.hive.parquet.ParquetTypeUtils.getColumnIO;
Expand All @@ -80,20 +80,13 @@ public class ParquetPageSourceFactory
.build();

private final TypeManager typeManager;
private final boolean useParquetColumnNames;
private final HdfsEnvironment hdfsEnvironment;
private final FileFormatDataSourceStats stats;

@Inject
public ParquetPageSourceFactory(TypeManager typeManager, HiveClientConfig config, HdfsEnvironment hdfsEnvironment, FileFormatDataSourceStats stats)
{
this(typeManager, requireNonNull(config, "hiveClientConfig is null").isUseParquetColumnNames(), hdfsEnvironment, stats);
}

public ParquetPageSourceFactory(TypeManager typeManager, boolean useParquetColumnNames, HdfsEnvironment hdfsEnvironment, FileFormatDataSourceStats stats)
public ParquetPageSourceFactory(TypeManager typeManager, HdfsEnvironment hdfsEnvironment, FileFormatDataSourceStats stats)
{
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.useParquetColumnNames = useParquetColumnNames;
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.stats = requireNonNull(stats, "stats is null");
}
Expand Down Expand Up @@ -129,7 +122,7 @@ public Optional<? extends ConnectorPageSource> createPageSource(
fileSize,
schema,
columns,
useParquetColumnNames,
isUseParquetColumnNames(session),
typeManager,
isParquetPredicatePushdownEnabled(session),
effectivePredicate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import com.facebook.presto.hive.FileFormatDataSourceStats;
import com.facebook.presto.hive.HdfsEnvironment;
import com.facebook.presto.hive.HiveClientConfig;
import com.facebook.presto.hive.HiveColumnHandle;
import com.facebook.presto.hive.HiveRecordCursorProvider;
import com.facebook.presto.spi.ConnectorSession;
Expand All @@ -35,6 +34,7 @@
import java.util.Set;

import static com.facebook.presto.hive.HiveSessionProperties.isParquetPredicatePushdownEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isUseParquetColumnNames;
import static com.facebook.presto.hive.HiveUtil.getDeserializerClassName;
import static java.util.Objects.requireNonNull;

Expand All @@ -46,19 +46,12 @@ public class ParquetRecordCursorProvider
.add("parquet.hive.serde.ParquetHiveSerDe")
.build();

private final boolean useParquetColumnNames;
private final HdfsEnvironment hdfsEnvironment;
private final FileFormatDataSourceStats stats;

@Inject
public ParquetRecordCursorProvider(HiveClientConfig hiveClientConfig, HdfsEnvironment hdfsEnvironment, FileFormatDataSourceStats stats)
public ParquetRecordCursorProvider(HdfsEnvironment hdfsEnvironment, FileFormatDataSourceStats stats)
{
this(requireNonNull(hiveClientConfig, "hiveClientConfig is null").isUseParquetColumnNames(), hdfsEnvironment, stats);
}

public ParquetRecordCursorProvider(boolean useParquetColumnNames, HdfsEnvironment hdfsEnvironment, FileFormatDataSourceStats stats)
{
this.useParquetColumnNames = useParquetColumnNames;
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.stats = requireNonNull(stats, "stats is null");
}
Expand Down Expand Up @@ -91,7 +84,7 @@ public Optional<RecordCursor> createRecordCursor(
fileSize,
schema,
columns,
useParquetColumnNames,
isUseParquetColumnNames(session),
typeManager,
isParquetPredicatePushdownEnabled(session),
effectivePredicate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,15 @@ public static Set<HivePageSourceFactory> getDefaultHiveDataStreamFactories(HiveC
.add(new RcFilePageSourceFactory(TYPE_MANAGER, testHdfsEnvironment, stats))
.add(new OrcPageSourceFactory(TYPE_MANAGER, hiveClientConfig, testHdfsEnvironment, stats))
.add(new DwrfPageSourceFactory(TYPE_MANAGER, testHdfsEnvironment, stats))
.add(new ParquetPageSourceFactory(TYPE_MANAGER, hiveClientConfig, testHdfsEnvironment, stats))
.add(new ParquetPageSourceFactory(TYPE_MANAGER, testHdfsEnvironment, stats))
.build();
}

public static Set<HiveRecordCursorProvider> getDefaultHiveRecordCursorProvider(HiveClientConfig hiveClientConfig)
{
HdfsEnvironment testHdfsEnvironment = createTestHdfsEnvironment(hiveClientConfig);
return ImmutableSet.<HiveRecordCursorProvider>builder()
.add(new ParquetRecordCursorProvider(hiveClientConfig, testHdfsEnvironment, new FileFormatDataSourceStats()))
.add(new ParquetRecordCursorProvider(testHdfsEnvironment, new FileFormatDataSourceStats()))
.add(new GenericHiveRecordCursorProvider(testHdfsEnvironment))
.build();
}
Expand Down
Loading

0 comments on commit 8e47c5b

Please sign in to comment.