From 7db44780a179cbc4cd20f78140470d989f480f75 Mon Sep 17 00:00:00 2001 From: amousavigourabi <28668597+amousavigourabi@users.noreply.github.com> Date: Sun, 17 Sep 2023 23:53:37 +0200 Subject: [PATCH 1/6] PARQUET-2347: Add interface layer for Configuration Add interface layer for the Hadoop Configuration. --- .../parquet/avro/AvroParquetReader.java | 24 +++ .../parquet/avro/AvroParquetWriter.java | 13 ++ .../apache/parquet/avro/AvroReadSupport.java | 23 ++- .../parquet/avro/AvroSchemaConverter.java | 6 + .../apache/parquet/avro/AvroWriteSupport.java | 14 +- .../apache/parquet/avro/TestReadWrite.java | 62 ++++++-- .../avro/TestReadWriteOldListBehavior.java | 7 + .../parquet/conf/ParquetConfiguration.java | 62 ++++++++ .../org/apache/parquet/util/Reflection.java | 50 +++++++ .../org/apache/parquet/HadoopReadOptions.java | 33 +---- .../apache/parquet/ParquetReadOptions.java | 83 ++++++++++- .../conf/HadoopParquetConfiguration.java | 140 ++++++++++++++++++ .../apache/parquet/hadoop/CodecFactory.java | 22 ++- .../org/apache/parquet/hadoop/DirectZstd.java | 14 ++ .../hadoop/InternalParquetRecordReader.java | 12 +- .../parquet/hadoop/ParquetInputFormat.java | 23 +++ .../apache/parquet/hadoop/ParquetReader.java | 58 ++++++-- .../apache/parquet/hadoop/ParquetWriter.java | 61 +++++++- .../hadoop/api/DelegatingReadSupport.java | 10 ++ .../hadoop/api/DelegatingWriteSupport.java | 6 + .../parquet/hadoop/api/InitContext.java | 16 +- .../parquet/hadoop/api/ReadSupport.java | 39 ++++- .../parquet/hadoop/api/WriteSupport.java | 10 ++ .../hadoop/example/ExampleParquetWriter.java | 6 + .../hadoop/example/GroupReadSupport.java | 16 ++ .../hadoop/example/GroupWriteSupport.java | 11 ++ .../hadoop/util/ConfigurationUtil.java | 26 +++- .../parquet/hadoop/util/HadoopCodecs.java | 5 + .../hadoop/util/SerializationUtil.java | 18 ++- .../org/apache/parquet/DirectWriterTest.java | 6 + .../SchemaControlEncryptionTest.java | 16 ++ .../parquet/hadoop/TestEncryptionOptions.java | 2 +- .../apache/parquet/pig/TupleReadSupport.java | 36 ++++- .../apache/parquet/pig/TupleWriteSupport.java | 7 + .../parquet/pig/TestTupleRecordConsumer.java | 3 +- .../parquet/pig/TupleConsumerPerfTest.java | 7 +- .../parquet/proto/ProtoMessageConverter.java | 17 ++- .../parquet/proto/ProtoParquetWriter.java | 14 +- .../parquet/proto/ProtoReadSupport.java | 9 +- .../parquet/proto/ProtoRecordConverter.java | 6 + .../proto/ProtoRecordMaterializer.java | 6 + .../parquet/proto/ProtoSchemaConverter.java | 14 +- .../parquet/proto/ProtoWriteSupport.java | 7 + .../thrift/AbstractThriftWriteSupport.java | 19 ++- .../hadoop/thrift/TBaseWriteSupport.java | 12 +- .../thrift/ThriftBytesWriteSupport.java | 22 ++- .../hadoop/thrift/ThriftReadSupport.java | 76 ++++++++-- .../hadoop/thrift/ThriftWriteSupport.java | 6 + .../parquet/thrift/ParquetWriteProtocol.java | 7 + .../parquet/thrift/TBaseRecordConverter.java | 9 +- .../parquet/thrift/ThriftRecordConverter.java | 16 +- .../thrift/ThriftSchemaConvertVisitor.java | 12 ++ .../parquet/thrift/ThriftSchemaConverter.java | 13 +- .../thrift/pig/TupleToThriftWriteSupport.java | 9 +- .../thrift/TestParquetWriteProtocol.java | 3 +- pom.xml | 2 + 56 files changed, 1109 insertions(+), 117 deletions(-) create mode 100644 parquet-common/src/main/java/org/apache/parquet/conf/ParquetConfiguration.java create mode 100644 parquet-common/src/main/java/org/apache/parquet/util/Reflection.java create mode 100644 parquet-hadoop/src/main/java/org/apache/parquet/conf/HadoopParquetConfiguration.java diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetReader.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetReader.java index 8970b66eae..3c98948b69 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetReader.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetReader.java @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.filter.UnboundRecordFilter; import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.api.ReadSupport; @@ -53,6 +54,10 @@ public static Builder builder(InputFile file) { return new Builder(file); } + public static Builder builder(InputFile file, ParquetConfiguration conf) { + return new Builder(file, conf); + } + /** * Convenience method for creating a ParquetReader which uses Avro * {@link GenericData} objects to store data from reads. @@ -67,6 +72,21 @@ public static ParquetReader genericRecordReader(InputFile file) t return new Builder(file).withDataModel(GenericData.get()).build(); } + /** + * Convenience method for creating a ParquetReader which uses Avro + * {@link GenericData} objects to store data from reads. + * + * @param file The location to read data from + * @param conf The configuration to use + * @return A {@code ParquetReader} which reads data as Avro + * {@code GenericData} + * @throws IOException if the InputFile has been closed, or if some other I/O + * error occurs + */ + public static ParquetReader genericRecordReader(InputFile file, ParquetConfiguration conf) throws IOException { + return new Builder(file, conf).withDataModel(GenericData.get()).build(); + } + /** * Convenience method for creating a ParquetReader which uses Avro * {@link GenericData} objects to store data from reads. @@ -143,6 +163,10 @@ private Builder(InputFile file) { super(file); } + private Builder(InputFile file, ParquetConfiguration conf) { + super(file, conf); + } + public Builder withDataModel(GenericData model) { this.model = model; diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java index 94d8167b0a..0d87d007f9 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java @@ -25,6 +25,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.column.ParquetProperties.WriterVersion; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -154,6 +156,12 @@ private static WriteSupport writeSupport(Schema avroSchema, private static WriteSupport writeSupport(Configuration conf, Schema avroSchema, GenericData model) { + return writeSupport(new HadoopParquetConfiguration(conf), avroSchema, model); + } + + private static WriteSupport writeSupport(ParquetConfiguration conf, + Schema avroSchema, + GenericData model) { return new AvroWriteSupport( new AvroSchemaConverter(conf).convert(avroSchema), avroSchema, model); } @@ -189,5 +197,10 @@ protected Builder self() { protected WriteSupport getWriteSupport(Configuration conf) { return AvroParquetWriter.writeSupport(conf, schema, model); } + + @Override + protected WriteSupport getWriteSupport(ParquetConfiguration conf) { + return AvroParquetWriter.writeSupport(conf, schema, model); + } } } diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java index 8f268a145a..9b165548e5 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java @@ -24,7 +24,10 @@ import org.apache.avro.generic.GenericData; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.util.ConfigurationUtil; import org.apache.parquet.io.api.RecordMaterializer; import org.apache.parquet.schema.MessageType; import org.slf4j.Logger; @@ -95,6 +98,13 @@ public AvroReadSupport(GenericData model) { public ReadContext init(Configuration configuration, Map keyValueMetaData, MessageType fileSchema) { + return init(new HadoopParquetConfiguration(configuration), keyValueMetaData, fileSchema); + } + + @Override + public ReadContext init(ParquetConfiguration configuration, + Map keyValueMetaData, + MessageType fileSchema) { MessageType projection = fileSchema; Map metadata = new LinkedHashMap(); @@ -120,6 +130,13 @@ public ReadContext init(Configuration configuration, public RecordMaterializer prepareForRead( Configuration configuration, Map keyValueMetaData, MessageType fileSchema, ReadContext readContext) { + return prepareForRead(new HadoopParquetConfiguration(configuration), keyValueMetaData, fileSchema, readContext); + } + + @Override + public RecordMaterializer prepareForRead( + ParquetConfiguration configuration, Map keyValueMetaData, + MessageType fileSchema, ReadContext readContext) { Map metadata = readContext.getReadSupportMetadata(); MessageType parquetSchema = readContext.getRequestedSchema(); Schema avroSchema; @@ -154,6 +171,10 @@ private static RecordMaterializer newCompatMaterializer( } private GenericData getDataModel(Configuration conf, Schema schema) { + return getDataModel(new HadoopParquetConfiguration(conf), schema); + } + + private GenericData getDataModel(ParquetConfiguration conf, Schema schema) { if (model != null) { return model; } @@ -175,6 +196,6 @@ private GenericData getDataModel(Configuration conf, Schema schema) { Class suppClass = conf.getClass( AVRO_DATA_SUPPLIER, SpecificDataSupplier.class, AvroDataSupplier.class); - return ReflectionUtils.newInstance(suppClass, conf).get(); + return ReflectionUtils.newInstance(suppClass, ConfigurationUtil.createHadoopConfiguration(conf)).get(); } } diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java index 0314bcd71a..abf94eaa2c 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java @@ -23,6 +23,8 @@ import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.schema.ConversionPatterns; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.LogicalTypeAnnotation; @@ -102,6 +104,10 @@ public AvroSchemaConverter() { } public AvroSchemaConverter(Configuration conf) { + this(new HadoopParquetConfiguration(conf)); + } + + public AvroSchemaConverter(ParquetConfiguration conf) { this.assumeRepeatedIsListElement = conf.getBoolean( ADD_LIST_ELEMENT_RECORDS, ADD_LIST_ELEMENT_RECORDS_DEFAULT); this.writeOldListStructure = conf.getBoolean( diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java index 564e745392..401886e72e 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java @@ -34,7 +34,10 @@ import org.apache.avro.generic.IndexedRecord; import org.apache.avro.util.Utf8; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.hadoop.api.WriteSupport; +import org.apache.parquet.hadoop.util.ConfigurationUtil; import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.RecordConsumer; import org.apache.parquet.schema.GroupType; @@ -129,6 +132,11 @@ public static void setSchema(Configuration configuration, Schema schema) { @Override public WriteContext init(Configuration configuration) { + return init(new HadoopParquetConfiguration(configuration)); + } + + @Override + public WriteContext init(ParquetConfiguration configuration) { if (rootAvroSchema == null) { this.rootAvroSchema = new Schema.Parser().parse(configuration.get(AVRO_SCHEMA)); this.rootSchema = new AvroSchemaConverter(configuration).convert(rootAvroSchema); @@ -405,6 +413,10 @@ private Binary fromAvroString(Object value) { } private static GenericData getDataModel(Configuration conf, Schema schema) { + return getDataModel(new HadoopParquetConfiguration(conf), schema); + } + + private static GenericData getDataModel(ParquetConfiguration conf, Schema schema) { if (conf.get(AVRO_DATA_SUPPLIER) == null && schema != null) { GenericData modelForSchema; try { @@ -423,7 +435,7 @@ private static GenericData getDataModel(Configuration conf, Schema schema) { Class suppClass = conf.getClass( AVRO_DATA_SUPPLIER, SpecificDataSupplier.class, AvroDataSupplier.class); - return ReflectionUtils.newInstance(suppClass, conf).get(); + return ReflectionUtils.newInstance(suppClass, ConfigurationUtil.createHadoopConfiguration(conf)).get(); } private abstract class ListWriter { diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java index 81e751aba5..9aaa9e3b28 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java @@ -52,6 +52,8 @@ import org.apache.avro.util.Utf8; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.api.WriteSupport; @@ -77,22 +79,31 @@ public class TestReadWrite { @Parameterized.Parameters public static Collection data() { Object[][] data = new Object[][] { - { false, false }, // use the new converters - { true, false }, // use the old converters - { false, true } }; // use a local disk location + { false, false, false }, // use the new converters with hadoop config + { true, false, false }, // use the old converters with hadoop config + { false, true, false }, // use a local disk location with hadoop config + { false, false, true }, // use the new converters with parquet config interface + { true, false, true }, // use the old converters with parquet config interface + { false, true, true } }; // use a local disk location with parquet config interface return Arrays.asList(data); } private final boolean compat; private final boolean local; + private final boolean confInterface; private final Configuration testConf = new Configuration(); + private final ParquetConfiguration parquetConf = new HadoopParquetConfiguration(true); - public TestReadWrite(boolean compat, boolean local) { + public TestReadWrite(boolean compat, boolean local, boolean confInterface) { this.compat = compat; this.local = local; + this.confInterface = confInterface; this.testConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, compat); - testConf.setBoolean("parquet.avro.add-list-element-records", false); - testConf.setBoolean("parquet.avro.write-old-list-structure", false); + this.testConf.setBoolean("parquet.avro.add-list-element-records", false); + this.testConf.setBoolean("parquet.avro.write-old-list-structure", false); + this.parquetConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, compat); + this.parquetConf.setBoolean("parquet.avro.add-list-element-records", false); + this.parquetConf.setBoolean("parquet.avro.write-old-list-structure", false); } @Test @@ -431,6 +442,11 @@ public void testAllUsingDefaultAvroSchema() throws Exception { @Override public WriteContext init(Configuration configuration) { + return init(new HadoopParquetConfiguration(configuration)); + } + + @Override + public WriteContext init(ParquetConfiguration configuration) { return new WriteContext(MessageTypeParser.parseMessageType(TestAvroSchemaConverter.ALL_PARQUET_SCHEMA), new HashMap()); } @@ -864,30 +880,44 @@ private File createTempFile() throws IOException { } private ParquetWriter writer(String file, Schema schema) throws IOException { + AvroParquetWriter.Builder writerBuilder; if (local) { - return AvroParquetWriter + writerBuilder = AvroParquetWriter .builder(new LocalOutputFile(Paths.get(file))) - .withSchema(schema) - .withConf(testConf) - .build(); + .withSchema(schema); } else { - return AvroParquetWriter + writerBuilder = AvroParquetWriter .builder(new Path(file)) - .withSchema(schema) + .withSchema(schema); + } + if (confInterface) { + return writerBuilder + .withConf(parquetConf) + .build(); + } else { + return writerBuilder .withConf(testConf) .build(); } } private ParquetReader reader(String file) throws IOException { + AvroParquetReader.Builder readerBuilder; if (local) { - return AvroParquetReader + readerBuilder = AvroParquetReader .builder(new LocalInputFile(Paths.get(file))) - .withDataModel(GenericData.get()) - .withConf(testConf) + .withDataModel(GenericData.get()); + } else { + return new AvroParquetReader<>(testConf, new Path(file)); + } + if (confInterface) { + return readerBuilder + .withConf(parquetConf) .build(); } else { - return new AvroParquetReader(testConf, new Path(file)); + return readerBuilder + .withConf(testConf) + .build(); } } diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java index f12417cae9..31a221d5bc 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java @@ -38,6 +38,8 @@ import org.apache.avro.util.Utf8; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.io.api.Binary; @@ -358,6 +360,11 @@ public void testAllUsingDefaultAvroSchema() throws Exception { @Override public WriteContext init(Configuration configuration) { + return init(new HadoopParquetConfiguration(configuration)); + } + + @Override + public WriteContext init(ParquetConfiguration configuration) { return new WriteContext(MessageTypeParser.parseMessageType(TestAvroSchemaConverter.ALL_PARQUET_SCHEMA), new HashMap()); } diff --git a/parquet-common/src/main/java/org/apache/parquet/conf/ParquetConfiguration.java b/parquet-common/src/main/java/org/apache/parquet/conf/ParquetConfiguration.java new file mode 100644 index 0000000000..8e7c80d37e --- /dev/null +++ b/parquet-common/src/main/java/org/apache/parquet/conf/ParquetConfiguration.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.conf; + +import java.util.Map; + +/** + * Configuration interface with the methods necessary to configure for Parquet applications. + */ +public interface ParquetConfiguration extends Iterable> { + + void set(String name, String value); + + void setLong(String name, long value); + + void setInt(String name, int value); + + void setBoolean(String name, boolean value); + + void setStrings(String name, String... value); + + void setClass(String name, Class value, Class xface); + + String get(String name); + + String get(String name, String defaultValue); + + long getLong(String name, long defaultValue); + + int getInt(String name, int defaultValue); + + boolean getBoolean(String name, boolean defaultValue); + + String getTrimmed(String name); + + String getTrimmed(String name, String defaultValue); + + String[] getStrings(String name, String[] defaultValue); + + Class getClass(String name, Class defaultValue); + + Class getClass(String name, Class defaultValue, Class xface); + + Class getClassByName(String name) throws ClassNotFoundException; +} diff --git a/parquet-common/src/main/java/org/apache/parquet/util/Reflection.java b/parquet-common/src/main/java/org/apache/parquet/util/Reflection.java new file mode 100644 index 0000000000..695ebd9f46 --- /dev/null +++ b/parquet-common/src/main/java/org/apache/parquet/util/Reflection.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.util; + +import java.lang.reflect.Constructor; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Lifted from Hadoop's org.apache.hadoop.util.ReflectionUtils. + */ +public class Reflection { + + private static final Class[] EMPTY_ARRAY = new Class[]{}; + private static final Map, Constructor> CONSTRUCTOR_CACHE = new ConcurrentHashMap, Constructor>(); + + @SuppressWarnings("unchecked") + public static T newInstance(Class theClass) { + T result; + try { + Constructor meth = (Constructor) CONSTRUCTOR_CACHE.get(theClass); + if (meth == null) { + meth = theClass.getDeclaredConstructor(EMPTY_ARRAY); + meth.setAccessible(true); + CONSTRUCTOR_CACHE.put(theClass, meth); + } + result = meth.newInstance(); + } catch (Exception e) { + throw new RuntimeException(e); + } + return result; + } +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java index 8f0d8d8933..9c98650100 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java @@ -23,29 +23,17 @@ import org.apache.hadoop.fs.Path; import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.compression.CompressionCodecFactory; +import org.apache.parquet.conf.HadoopParquetConfiguration; import org.apache.parquet.crypto.DecryptionPropertiesFactory; import org.apache.parquet.crypto.FileDecryptionProperties; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter; -import org.apache.parquet.hadoop.util.HadoopCodecs; import java.util.Map; -import static org.apache.parquet.hadoop.ParquetInputFormat.COLUMN_INDEX_FILTERING_ENABLED; -import static org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED; -import static org.apache.parquet.hadoop.ParquetInputFormat.BLOOM_FILTERING_ENABLED; -import static org.apache.parquet.hadoop.ParquetInputFormat.OFF_HEAP_DECRYPT_BUFFER_ENABLED; -import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter; -import static org.apache.parquet.hadoop.ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED; -import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED; -import static org.apache.parquet.hadoop.ParquetInputFormat.STATS_FILTERING_ENABLED; -import static org.apache.parquet.hadoop.UnmaterializableRecordCounter.BAD_RECORD_THRESHOLD_CONF_KEY; - public class HadoopReadOptions extends ParquetReadOptions { private final Configuration conf; - private static final String ALLOCATION_SIZE = "parquet.read.allocation.size"; - private HadoopReadOptions(boolean useSignedStringMinMax, boolean useStatsFilter, boolean useDictionaryFilter, @@ -65,7 +53,7 @@ private HadoopReadOptions(boolean useSignedStringMinMax, super( useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, useColumnIndexFilter, usePageChecksumVerification, useBloomFilter, useOffHeapDecryptBuffer, recordFilter, metadataFilter, codecFactory, allocator, - maxAllocationSize, properties, fileDecryptionProperties + maxAllocationSize, properties, fileDecryptionProperties, new HadoopParquetConfiguration(conf) ); this.conf = conf; } @@ -100,24 +88,9 @@ public Builder(Configuration conf) { } public Builder(Configuration conf, Path filePath) { + super(new HadoopParquetConfiguration(conf)); this.conf = conf; this.filePath = filePath; - useSignedStringMinMax(conf.getBoolean("parquet.strings.signed-min-max.enabled", false)); - useDictionaryFilter(conf.getBoolean(DICTIONARY_FILTERING_ENABLED, true)); - useStatsFilter(conf.getBoolean(STATS_FILTERING_ENABLED, true)); - useRecordFilter(conf.getBoolean(RECORD_FILTERING_ENABLED, true)); - useColumnIndexFilter(conf.getBoolean(COLUMN_INDEX_FILTERING_ENABLED, true)); - usePageChecksumVerification(conf.getBoolean(PAGE_VERIFY_CHECKSUM_ENABLED, - usePageChecksumVerification)); - useBloomFilter(conf.getBoolean(BLOOM_FILTERING_ENABLED, true)); - useOffHeapDecryptBuffer(conf.getBoolean(OFF_HEAP_DECRYPT_BUFFER_ENABLED, false)); - withCodecFactory(HadoopCodecs.newFactory(conf, 0)); - withRecordFilter(getFilter(conf)); - withMaxAllocationInBytes(conf.getInt(ALLOCATION_SIZE, 8388608)); - String badRecordThresh = conf.get(BAD_RECORD_THRESHOLD_CONF_KEY); - if (badRecordThresh != null) { - set(BAD_RECORD_THRESHOLD_CONF_KEY, badRecordThresh); - } } @Override diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java index dc130ee8d2..28e90522e4 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java @@ -19,9 +19,12 @@ package org.apache.parquet; +import org.apache.hadoop.conf.Configuration; import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.HeapByteBufferAllocator; import org.apache.parquet.compression.CompressionCodecFactory; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.crypto.FileDecryptionProperties; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.format.converter.ParquetMetadataConverter; @@ -34,9 +37,21 @@ import java.util.Set; import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; +import static org.apache.parquet.hadoop.ParquetInputFormat.BLOOM_FILTERING_ENABLED; +import static org.apache.parquet.hadoop.ParquetInputFormat.COLUMN_INDEX_FILTERING_ENABLED; +import static org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED; +import static org.apache.parquet.hadoop.ParquetInputFormat.OFF_HEAP_DECRYPT_BUFFER_ENABLED; +import static org.apache.parquet.hadoop.ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED; +import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED; +import static org.apache.parquet.hadoop.ParquetInputFormat.STATS_FILTERING_ENABLED; +import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter; +import static org.apache.parquet.hadoop.UnmaterializableRecordCounter.BAD_RECORD_THRESHOLD_CONF_KEY; // Internal use only public class ParquetReadOptions { + + private static final String ALLOCATION_SIZE = "parquet.read.allocation.size"; + private static final boolean RECORD_FILTERING_ENABLED_DEFAULT = true; private static final boolean STATS_FILTERING_ENABLED_DEFAULT = true; private static final boolean DICTIONARY_FILTERING_ENABLED_DEFAULT = true; @@ -61,6 +76,7 @@ public class ParquetReadOptions { private final int maxAllocationSize; private final Map properties; private final FileDecryptionProperties fileDecryptionProperties; + private final ParquetConfiguration conf; ParquetReadOptions(boolean useSignedStringMinMax, boolean useStatsFilter, @@ -77,6 +93,28 @@ public class ParquetReadOptions { int maxAllocationSize, Map properties, FileDecryptionProperties fileDecryptionProperties) { + this(useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, useColumnIndexFilter, + usePageChecksumVerification, useBloomFilter, useOffHeapDecryptBuffer, recordFilter, metadataFilter, + codecFactory, allocator, maxAllocationSize, properties, fileDecryptionProperties, + new HadoopParquetConfiguration()); + } + + ParquetReadOptions(boolean useSignedStringMinMax, + boolean useStatsFilter, + boolean useDictionaryFilter, + boolean useRecordFilter, + boolean useColumnIndexFilter, + boolean usePageChecksumVerification, + boolean useBloomFilter, + boolean useOffHeapDecryptBuffer, + FilterCompat.Filter recordFilter, + ParquetMetadataConverter.MetadataFilter metadataFilter, + CompressionCodecFactory codecFactory, + ByteBufferAllocator allocator, + int maxAllocationSize, + Map properties, + FileDecryptionProperties fileDecryptionProperties, + ParquetConfiguration conf) { this.useSignedStringMinMax = useSignedStringMinMax; this.useStatsFilter = useStatsFilter; this.useDictionaryFilter = useDictionaryFilter; @@ -92,6 +130,7 @@ public class ParquetReadOptions { this.maxAllocationSize = maxAllocationSize; this.properties = Collections.unmodifiableMap(properties); this.fileDecryptionProperties = fileDecryptionProperties; + this.conf = conf; } public boolean useSignedStringMinMax() { @@ -164,8 +203,16 @@ public boolean isEnabled(String property, boolean defaultValue) { : defaultValue; } + public ParquetConfiguration getConfiguration() { + return conf; + } + public static Builder builder() { - return new Builder(); + return new Builder(new HadoopParquetConfiguration()); + } + + public static Builder builder(ParquetConfiguration conf) { + return new Builder(conf); } public static class Builder { @@ -185,6 +232,31 @@ public static class Builder { protected int maxAllocationSize = ALLOCATION_SIZE_DEFAULT; protected Map properties = new HashMap<>(); protected FileDecryptionProperties fileDecryptionProperties = null; + protected ParquetConfiguration conf; + + public Builder() { + conf = new HadoopParquetConfiguration(); + } + + public Builder(ParquetConfiguration conf) { + this.conf = conf; + useSignedStringMinMax(conf.getBoolean("parquet.strings.signed-min-max.enabled", false)); + useDictionaryFilter(conf.getBoolean(DICTIONARY_FILTERING_ENABLED, true)); + useStatsFilter(conf.getBoolean(STATS_FILTERING_ENABLED, true)); + useRecordFilter(conf.getBoolean(RECORD_FILTERING_ENABLED, true)); + useColumnIndexFilter(conf.getBoolean(COLUMN_INDEX_FILTERING_ENABLED, true)); + usePageChecksumVerification(conf.getBoolean(PAGE_VERIFY_CHECKSUM_ENABLED, + usePageChecksumVerification)); + useBloomFilter(conf.getBoolean(BLOOM_FILTERING_ENABLED, true)); + useOffHeapDecryptBuffer(conf.getBoolean(OFF_HEAP_DECRYPT_BUFFER_ENABLED, false)); + withCodecFactory(HadoopCodecs.newFactory(conf, 0)); + withRecordFilter(getFilter(conf)); + withMaxAllocationInBytes(conf.getInt(ALLOCATION_SIZE, 8388608)); + String badRecordThresh = conf.get(BAD_RECORD_THRESHOLD_CONF_KEY); + if (badRecordThresh != null) { + set(BAD_RECORD_THRESHOLD_CONF_KEY, badRecordThresh); + } + } public Builder useSignedStringMinMax(boolean useSignedStringMinMax) { this.useSignedStringMinMax = useSignedStringMinMax; @@ -325,6 +397,7 @@ public Builder copy(ParquetReadOptions options) { withAllocator(options.allocator); withPageChecksumVerification(options.usePageChecksumVerification); withDecryption(options.fileDecryptionProperties); + conf = options.conf; for (Map.Entry keyValue : options.properties.entrySet()) { set(keyValue.getKey(), keyValue.getValue()); } @@ -333,13 +406,17 @@ public Builder copy(ParquetReadOptions options) { public ParquetReadOptions build() { if (codecFactory == null) { - codecFactory = HadoopCodecs.newFactory(0); + if (conf == null) { + codecFactory = HadoopCodecs.newFactory(0); + } else { + codecFactory = HadoopCodecs.newFactory(conf, 0); + } } return new ParquetReadOptions( useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, useColumnIndexFilter, usePageChecksumVerification, useBloomFilter, useOffHeapDecryptBuffer, recordFilter, metadataFilter, - codecFactory, allocator, maxAllocationSize, properties, fileDecryptionProperties); + codecFactory, allocator, maxAllocationSize, properties, fileDecryptionProperties, conf); } } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/conf/HadoopParquetConfiguration.java b/parquet-hadoop/src/main/java/org/apache/parquet/conf/HadoopParquetConfiguration.java new file mode 100644 index 0000000000..26fce1e9b8 --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/conf/HadoopParquetConfiguration.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.conf; + +import org.apache.hadoop.conf.Configuration; + +import java.util.Iterator; +import java.util.Map; + +/** + * Implementation of the Parquet configuration interface relying on Hadoop's + * Configuration to aid with interoperability and backwards compatibility. + */ +public class HadoopParquetConfiguration implements ParquetConfiguration { + + private final Configuration configuration; + + public HadoopParquetConfiguration() { + this(true); + } + + public HadoopParquetConfiguration(boolean loadDefaults) { + configuration = new Configuration(loadDefaults); + } + + public HadoopParquetConfiguration(Configuration conf) { + configuration = conf; + } + + public Configuration getConfiguration() { + return configuration; + } + + @Override + public void set(String name, String value) { + configuration.set(name, value); + } + + @Override + public void setLong(String name, long value) { + configuration.setLong(name, value); + } + + @Override + public void setInt(String name, int value) { + configuration.setInt(name, value); + } + + @Override + public void setBoolean(String name, boolean value) { + configuration.setBoolean(name, value); + } + + @Override + public void setStrings(String name, String... values) { + configuration.setStrings(name, values); + } + + @Override + public void setClass(String name, Class value, Class xface) { + configuration.setClass(name, value, xface); + } + + @Override + public String get(String name) { + return configuration.get(name); + } + + @Override + public String get(String name, String defaultValue) { + return configuration.get(name, defaultValue); + } + + @Override + public long getLong(String name, long defaultValue) { + return configuration.getLong(name, defaultValue); + } + + @Override + public int getInt(String name, int defaultValue) { + return configuration.getInt(name, defaultValue); + } + + @Override + public boolean getBoolean(String name, boolean defaultValue) { + return configuration.getBoolean(name, defaultValue); + } + + @Override + public String getTrimmed(String name) { + return configuration.getTrimmed(name); + } + + @Override + public String getTrimmed(String name, String defaultValue) { + return configuration.getTrimmed(name, defaultValue); + } + + @Override + public String[] getStrings(String name, String[] defaultValue) { + return configuration.getStrings(name, defaultValue); + } + + @Override + public Class getClass(String name, Class defaultValue) { + return configuration.getClass(name, defaultValue); + } + + @Override + public Class getClass(String name, Class defaultValue, Class xface) { + return configuration.getClass(name, defaultValue, xface); + } + + @Override + public Class getClassByName(String name) throws ClassNotFoundException { + return configuration.getClassByName(name); + } + + @Override + public Iterator> iterator() { + return configuration.iterator(); + } +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java index beb1c75add..5f1a777e30 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java @@ -37,8 +37,11 @@ import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.compression.CompressionCodecFactory; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.hadoop.codec.ZstandardCodec; import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.util.ConfigurationUtil; public class CodecFactory implements CompressionCodecFactory { @@ -48,7 +51,7 @@ public class CodecFactory implements CompressionCodecFactory { private final Map compressors = new HashMap(); private final Map decompressors = new HashMap(); - protected final Configuration configuration; + protected final ParquetConfiguration configuration; protected final int pageSize; /** @@ -61,6 +64,19 @@ public class CodecFactory implements CompressionCodecFactory { * decompressors this parameter has no impact on the function of the factory */ public CodecFactory(Configuration configuration, int pageSize) { + this(new HadoopParquetConfiguration(configuration), pageSize); + } + + /** + * Create a new codec factory. + * + * @param configuration used to pass compression codec configuration information + * @param pageSize the expected page size, does not set a hard limit, currently just + * used to set the initial size of the output stream used when + * compressing a buffer. If this factory is only used to construct + * decompressors this parameter has no impact on the function of the factory + */ + public CodecFactory(ParquetConfiguration configuration, int pageSize) { this.configuration = configuration; this.pageSize = pageSize; } @@ -246,9 +262,9 @@ protected CompressionCodec getCodec(CompressionCodecName codecName) { codecClass = Class.forName(codecClassName); } catch (ClassNotFoundException e) { // Try to load the class using the job classloader - codecClass = configuration.getClassLoader().loadClass(codecClassName); + codecClass = new Configuration(false).getClassLoader().loadClass(codecClassName); } - codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, configuration); + codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, ConfigurationUtil.createHadoopConfiguration(configuration)); CODEC_BY_NAME.put(codecCacheKey, codec); return codec; } catch (ClassNotFoundException e) { diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectZstd.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectZstd.java index 1532e83dfe..d6b92faffd 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectZstd.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectZstd.java @@ -25,6 +25,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.hadoop.codec.ZstdDecompressorStream; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -51,6 +53,10 @@ class DirectZstd { static CodecFactory.BytesCompressor createCompressor(Configuration conf, int pageSize) { + return createCompressor(new HadoopParquetConfiguration(conf), pageSize); + } + + static CodecFactory.BytesCompressor createCompressor(ParquetConfiguration conf, int pageSize) { return new ZstdCompressor( getPool(conf), conf.getInt(PARQUET_COMPRESS_ZSTD_LEVEL, DEFAULT_PARQUET_COMPRESS_ZSTD_LEVEL), @@ -59,6 +65,10 @@ static CodecFactory.BytesCompressor createCompressor(Configuration conf, int pag } static CodecFactory.BytesDecompressor createDecompressor(Configuration conf) { + return createDecompressor(new HadoopParquetConfiguration(conf)); + } + + static CodecFactory.BytesDecompressor createDecompressor(ParquetConfiguration conf) { return new ZstdDecompressor(getPool(conf)); } @@ -135,6 +145,10 @@ BytesInput getBytesInput() { } private static BufferPool getPool(Configuration conf) { + return getPool(new HadoopParquetConfiguration(conf)); + } + + private static BufferPool getPool(ParquetConfiguration conf) { if (conf.getBoolean(PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED, DEFAULT_PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED)) { return RecyclingBufferPool.INSTANCE; } else { diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java index 8203e9098d..d94d5d2ef4 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.PrimitiveIterator; import java.util.Set; @@ -32,6 +33,7 @@ import org.apache.parquet.HadoopReadOptions; import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.filter.UnboundRecordFilter; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.filter2.compat.FilterCompat.Filter; @@ -167,13 +169,13 @@ public float getProgress() throws IOException, InterruptedException { public void initialize(ParquetFileReader reader, ParquetReadOptions options) { // copy custom configuration to the Configuration passed to the ReadSupport - Configuration conf = new Configuration(); - if (options instanceof HadoopReadOptions) { - conf = ((HadoopReadOptions) options).getConf(); - } + ParquetConfiguration conf = Objects.requireNonNull(options).getConfiguration(); for (String property : options.getPropertyNames()) { conf.set(property, options.getProperty(property)); } + for (Map.Entry property : new Configuration()) { + conf.set(property.getKey(), property.getValue()); + } // initialize a ReadContext for this file this.reader = reader; @@ -261,7 +263,7 @@ public boolean nextKeyValue() throws IOException, InterruptedException { LOG.debug("read value: {}", currentValue); } catch (RuntimeException e) { - throw new ParquetDecodingException(format("Can not read value at %d in block %d in file %s", current, currentBlock, reader.getPath()), e); + throw new ParquetDecodingException(format("Can not read value at %d in block %d in file %s", current, currentBlock, reader.getFile()), e); } } return true; diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java index 1bfd4b20f0..3e96535794 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java @@ -52,6 +52,8 @@ import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.parquet.Preconditions; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.filter.UnboundRecordFilter; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.filter2.compat.FilterCompat.Filter; @@ -211,6 +213,19 @@ private static UnboundRecordFilter getUnboundRecordFilterInstance(Configuration } } + private static UnboundRecordFilter getUnboundRecordFilterInstance(ParquetConfiguration configuration) { + Class clazz = ConfigurationUtil.getClassFromConfig(configuration, UNBOUND_RECORD_FILTER, UnboundRecordFilter.class); + if (clazz == null) { + return null; + } + try { + return (UnboundRecordFilter) clazz.newInstance(); + } catch (InstantiationException | IllegalAccessException e) { + throw new BadConfigurationException( + "could not instantiate unbound record filter class", e); + } + } + public static void setReadSupportClass(JobConf conf, Class readSupportClass) { conf.set(READ_SUPPORT_CLASS, readSupportClass.getName()); } @@ -232,6 +247,10 @@ public static void setFilterPredicate(Configuration configuration, FilterPredica } private static FilterPredicate getFilterPredicate(Configuration configuration) { + return getFilterPredicate(new HadoopParquetConfiguration(configuration)); + } + + private static FilterPredicate getFilterPredicate(ParquetConfiguration configuration) { try { return SerializationUtil.readObjectFromConfAsBase64(FILTER_PREDICATE, configuration); } catch (IOException e) { @@ -247,6 +266,10 @@ private static FilterPredicate getFilterPredicate(Configuration configuration) { * @return a filter for the unbound record filter specified in conf */ public static Filter getFilter(Configuration conf) { + return getFilter(new HadoopParquetConfiguration(conf)); + } + + public static Filter getFilter(ParquetConfiguration conf) { return FilterCompat.get(getFilterPredicate(conf), getUnboundRecordFilterInstance(conf)); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java index f9c8314dd3..785e5d05d8 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java @@ -36,11 +36,14 @@ import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.HeapByteBufferAllocator; import org.apache.parquet.compression.CompressionCodecFactory; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.crypto.FileDecryptionProperties; import org.apache.parquet.filter.UnboundRecordFilter; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.filter2.compat.FilterCompat.Filter; import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.util.ConfigurationUtil; import org.apache.parquet.hadoop.util.HadoopInputFile; import org.apache.parquet.HadoopReadOptions; import org.apache.parquet.hadoop.util.HiddenFileFilter; @@ -180,6 +183,10 @@ public static Builder read(InputFile file) throws IOException { return new Builder<>(file); } + public static Builder read(InputFile file, ParquetConfiguration conf) throws IOException { + return new Builder<>(file, conf); + } + public static Builder builder(ReadSupport readSupport, Path path) { return new Builder<>(readSupport, path); } @@ -190,7 +197,7 @@ public static class Builder { private final Path path; private Filter filter = null; private ByteBufferAllocator allocator = new HeapByteBufferAllocator(); - protected Configuration conf; + protected ParquetConfiguration conf; private ParquetReadOptions.Builder optionsBuilder; @Deprecated @@ -198,8 +205,9 @@ private Builder(ReadSupport readSupport, Path path) { this.readSupport = Objects.requireNonNull(readSupport, "readSupport cannot be null"); this.file = null; this.path = Objects.requireNonNull(path, "path cannot be null"); - this.conf = new Configuration(); - this.optionsBuilder = HadoopReadOptions.builder(conf, path); + Configuration hadoopConf = new Configuration(); + this.conf = new HadoopParquetConfiguration(hadoopConf); + this.optionsBuilder = HadoopReadOptions.builder(hadoopConf, path); } @Deprecated @@ -207,8 +215,9 @@ protected Builder(Path path) { this.readSupport = null; this.file = null; this.path = Objects.requireNonNull(path, "path cannot be null"); - this.conf = new Configuration(); - this.optionsBuilder = HadoopReadOptions.builder(conf, path); + Configuration hadoopConf = new Configuration(); + this.conf = new HadoopParquetConfiguration(hadoopConf); + this.optionsBuilder = HadoopReadOptions.builder(hadoopConf, path); } protected Builder(InputFile file) { @@ -217,17 +226,30 @@ protected Builder(InputFile file) { this.path = null; if (file instanceof HadoopInputFile) { HadoopInputFile hadoopFile = (HadoopInputFile) file; - this.conf = hadoopFile.getConfiguration(); - optionsBuilder = HadoopReadOptions.builder(conf, hadoopFile.getPath()); + Configuration hadoopConf = hadoopFile.getConfiguration(); + this.conf = new HadoopParquetConfiguration(hadoopConf); + optionsBuilder = HadoopReadOptions.builder(hadoopConf, hadoopFile.getPath()); } else { - this.conf = new Configuration(); - optionsBuilder = HadoopReadOptions.builder(conf); + optionsBuilder = ParquetReadOptions.builder(new HadoopParquetConfiguration()); + } + } + + protected Builder(InputFile file, ParquetConfiguration conf) { + this.readSupport = null; + this.file = Objects.requireNonNull(file, "file cannot be null"); + this.path = null; + this.conf = conf; + if (file instanceof HadoopInputFile) { + HadoopInputFile hadoopFile = (HadoopInputFile) file; + optionsBuilder = HadoopReadOptions.builder(ConfigurationUtil.createHadoopConfiguration(conf), hadoopFile.getPath()); + } else { + optionsBuilder = ParquetReadOptions.builder(conf); } } // when called, resets options to the defaults from conf public Builder withConf(Configuration conf) { - this.conf = Objects.requireNonNull(conf, "conf cannot be null"); + this.conf = new HadoopParquetConfiguration(Objects.requireNonNull(conf, "conf cannot be null")); // previous versions didn't use the builder, so may set filter before conf. this maintains // compatibility for filter. other options are reset by a new conf. @@ -239,6 +261,15 @@ public Builder withConf(Configuration conf) { return this; } + public Builder withConf(ParquetConfiguration conf) { + this.conf = conf; + this.optionsBuilder = ParquetReadOptions.builder(conf); + if (filter != null) { + optionsBuilder.withRecordFilter(filter); + } + return this; + } + public Builder withFilter(Filter filter) { this.filter = filter; optionsBuilder.withRecordFilter(filter); @@ -354,19 +385,20 @@ public ParquetReader build() throws IOException { .build(); if (path != null) { - FileSystem fs = path.getFileSystem(conf); + Configuration hadoopConf = ConfigurationUtil.createHadoopConfiguration(conf); + FileSystem fs = path.getFileSystem(hadoopConf); FileStatus stat = fs.getFileStatus(path); if (stat.isFile()) { return new ParquetReader<>( - Collections.singletonList((InputFile) HadoopInputFile.fromStatus(stat, conf)), + Collections.singletonList((InputFile) HadoopInputFile.fromStatus(stat, hadoopConf)), options, getReadSupport()); } else { List files = new ArrayList<>(); for (FileStatus fileStatus : fs.listStatus(path, HiddenFileFilter.INSTANCE)) { - files.add(HadoopInputFile.fromStatus(fileStatus, conf)); + files.add(HadoopInputFile.fromStatus(fileStatus, hadoopConf)); } return new ParquetReader(files, options, getReadSupport()); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java index 7400266514..0df1c090c4 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java @@ -26,10 +26,13 @@ import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.ParquetProperties.WriterVersion; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.crypto.FileEncryptionProperties; import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.ConfigurationUtil; import org.apache.parquet.hadoop.util.HadoopOutputFile; import org.apache.parquet.io.OutputFile; import org.apache.parquet.schema.MessageType; @@ -275,6 +278,30 @@ public ParquetWriter(Path file, Configuration conf, WriteSupport writeSupport int maxPaddingSize, ParquetProperties encodingProps, FileEncryptionProperties encryptionProperties) throws IOException { + this( + file, + mode, + writeSupport, + compressionCodecName, + rowGroupSize, + validating, + new HadoopParquetConfiguration(conf), + maxPaddingSize, + encodingProps, + encryptionProperties); + } + + ParquetWriter( + OutputFile file, + ParquetFileWriter.Mode mode, + WriteSupport writeSupport, + CompressionCodecName compressionCodecName, + long rowGroupSize, + boolean validating, + ParquetConfiguration conf, + int maxPaddingSize, + ParquetProperties encodingProps, + FileEncryptionProperties encryptionProperties) throws IOException { WriteSupport.WriteContext writeContext = writeSupport.init(conf); MessageType schema = writeContext.getSchema(); @@ -282,8 +309,9 @@ public ParquetWriter(Path file, Configuration conf, WriteSupport writeSupport // encryptionProperties could be built from the implementation of EncryptionPropertiesFactory when it is attached. if (encryptionProperties == null) { String path = file == null ? null : file.getPath(); - encryptionProperties = ParquetOutputFormat.createEncryptionProperties(conf, - path == null ? null : new Path(path), writeContext); + Configuration hadoopConf = ConfigurationUtil.createHadoopConfiguration(conf); + encryptionProperties = ParquetOutputFormat.createEncryptionProperties( + hadoopConf, path == null ? null : new Path(path), writeContext); } ParquetFileWriter fileWriter = new ParquetFileWriter( @@ -352,7 +380,7 @@ public abstract static class Builder> { private OutputFile file = null; private Path path = null; private FileEncryptionProperties encryptionProperties = null; - private Configuration conf = new Configuration(); + private ParquetConfiguration conf = null; private ParquetFileWriter.Mode mode; private CompressionCodecName codecName = DEFAULT_COMPRESSION_CODEC_NAME; private long rowGroupSize = DEFAULT_BLOCK_SIZE; @@ -380,6 +408,14 @@ protected Builder(OutputFile path) { */ protected abstract WriteSupport getWriteSupport(Configuration conf); + /** + * @param conf a configuration + * @return an appropriate WriteSupport for the object model. + */ + protected WriteSupport getWriteSupport(ParquetConfiguration conf) { + throw new UnsupportedOperationException("Override getWriteSupport(ParquetConfiguration)"); + } + /** * Set the {@link Configuration} used by the constructed writer. * @@ -387,6 +423,17 @@ protected Builder(OutputFile path) { * @return this builder for method chaining. */ public SELF withConf(Configuration conf) { + this.conf = new HadoopParquetConfiguration(conf); + return self(); + } + + /** + * Set the {@link ParquetConfiguration} used by the constructed writer. + * + * @param conf a {@code ParquetConfiguration} + * @return this builder for method chaining. + */ + public SELF withConf(ParquetConfiguration conf) { this.conf = conf; return self(); } @@ -718,6 +765,9 @@ public SELF withStatisticsTruncateLength(int length) { * @return this builder for method chaining. */ public SELF config(String property, String value) { + if (conf == null) { + conf = new HadoopParquetConfiguration(); + } conf.set(property, value); return self(); } @@ -729,12 +779,15 @@ public SELF config(String property, String value) { * @throws IOException if there is an error while creating the writer */ public ParquetWriter build() throws IOException { + if (conf == null) { + conf = new HadoopParquetConfiguration(); + } if (file != null) { return new ParquetWriter<>(file, mode, getWriteSupport(conf), codecName, rowGroupSize, enableValidation, conf, maxPaddingSize, encodingPropsBuilder.build(), encryptionProperties); } else { - return new ParquetWriter<>(HadoopOutputFile.fromPath(path, conf), + return new ParquetWriter<>(HadoopOutputFile.fromPath(path, ConfigurationUtil.createHadoopConfiguration(conf)), mode, getWriteSupport(conf), codecName, rowGroupSize, enableValidation, conf, maxPaddingSize, encodingPropsBuilder.build(), encryptionProperties); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/DelegatingReadSupport.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/DelegatingReadSupport.java index 8100a351f3..2593393d77 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/DelegatingReadSupport.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/DelegatingReadSupport.java @@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.io.api.RecordMaterializer; import org.apache.parquet.schema.MessageType; @@ -53,6 +54,15 @@ public RecordMaterializer prepareForRead( return delegate.prepareForRead(configuration, keyValueMetaData, fileSchema, readContext); } + @Override + public RecordMaterializer prepareForRead( + ParquetConfiguration configuration, + Map keyValueMetaData, + MessageType fileSchema, + ReadSupport.ReadContext readContext) { + return delegate.prepareForRead(configuration, keyValueMetaData, fileSchema, readContext); + } + @Override public String toString() { return this.getClass().getName() + "(" + delegate.toString() + ")"; diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/DelegatingWriteSupport.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/DelegatingWriteSupport.java index f5bbfc60de..926fe68c63 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/DelegatingWriteSupport.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/DelegatingWriteSupport.java @@ -20,6 +20,7 @@ import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.io.api.RecordConsumer; /** @@ -42,6 +43,11 @@ public WriteSupport.WriteContext init(Configuration configuration) { return delegate.init(configuration); } + @Override + public WriteSupport.WriteContext init(ParquetConfiguration configuration) { + return delegate.init(configuration); + } + @Override public void prepareForWrite(RecordConsumer recordConsumer) { delegate.prepareForWrite(recordConsumer); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/InitContext.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/InitContext.java index 6bc5e5d3d4..b2e36c842a 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/InitContext.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/InitContext.java @@ -25,6 +25,9 @@ import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; +import org.apache.parquet.hadoop.util.ConfigurationUtil; import org.apache.parquet.schema.MessageType; /** @@ -35,7 +38,7 @@ public class InitContext { private final Map> keyValueMetadata; private Map mergedKeyValueMetadata; - private final Configuration configuration; + private final ParquetConfiguration configuration; private final MessageType fileSchema; /** @@ -47,6 +50,13 @@ public InitContext( Configuration configuration, Map> keyValueMetadata, MessageType fileSchema) { + this(new HadoopParquetConfiguration(configuration), keyValueMetadata, fileSchema); + } + + public InitContext( + ParquetConfiguration configuration, + Map> keyValueMetadata, + MessageType fileSchema) { super(); this.keyValueMetadata = keyValueMetadata; this.configuration = configuration; @@ -77,6 +87,10 @@ public Map getMergedKeyValueMetaData() { * @return the configuration for this job */ public Configuration getConfiguration() { + return ConfigurationUtil.createHadoopConfiguration(configuration); + } + + public ParquetConfiguration getConfig() { return configuration; } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/ReadSupport.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/ReadSupport.java index 62344522b6..5908bf6ca3 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/ReadSupport.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/ReadSupport.java @@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.io.api.RecordMaterializer; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.MessageTypeParser; @@ -75,6 +76,24 @@ public ReadContext init( throw new UnsupportedOperationException("Override init(InitContext)"); } + /** + * called in {@link org.apache.hadoop.mapreduce.InputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext)} in the front end + * + * @param configuration the configuration + * @param keyValueMetaData the app specific metadata from the file + * @param fileSchema the schema of the file + * @return the readContext that defines how to read the file + * + * @deprecated override {@link ReadSupport#init(InitContext)} instead + */ + @Deprecated + public ReadContext init( + ParquetConfiguration configuration, + Map keyValueMetaData, + MessageType fileSchema) { + throw new UnsupportedOperationException("Override init(InitContext)"); + } + /** * called in {@link org.apache.hadoop.mapreduce.InputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext)} in the front end * @@ -82,7 +101,7 @@ public ReadContext init( * @return the readContext that defines how to read the file */ public ReadContext init(InitContext context) { - return init(context.getConfiguration(), context.getMergedKeyValueMetaData(), context.getFileSchema()); + return init(context.getConfig(), context.getMergedKeyValueMetaData(), context.getFileSchema()); } /** @@ -101,6 +120,24 @@ abstract public RecordMaterializer prepareForRead( MessageType fileSchema, ReadContext readContext); + /** + * called in {@link org.apache.hadoop.mapreduce.RecordReader#initialize(org.apache.hadoop.mapreduce.InputSplit, org.apache.hadoop.mapreduce.TaskAttemptContext)} in the back end + * the returned RecordMaterializer will materialize the records and add them to the destination + * + * @param configuration the configuration + * @param keyValueMetaData the app specific metadata from the file + * @param fileSchema the schema of the file + * @param readContext returned by the init method + * @return the recordMaterializer that will materialize the records + */ + public RecordMaterializer prepareForRead( + ParquetConfiguration configuration, + Map keyValueMetaData, + MessageType fileSchema, + ReadContext readContext) { + throw new UnsupportedOperationException("Override prepareForRead(ParquetConfiguration, Map, MessageType, ReadContext)"); + } + /** * information to read the file */ diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/WriteSupport.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/WriteSupport.java index 9549d5f492..a128d29179 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/WriteSupport.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/WriteSupport.java @@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.io.api.RecordConsumer; import org.apache.parquet.schema.MessageType; @@ -105,6 +106,15 @@ public Map getExtraMetaData() { */ public abstract WriteContext init(Configuration configuration); + /** + * called first in the task + * @param configuration the job's configuration + * @return the information needed to write the file + */ + public WriteContext init(ParquetConfiguration configuration) { + throw new UnsupportedOperationException("Override init(ParquetConfiguration)"); + } + /** * This will be called once per row group * @param recordConsumer the recordConsumer to write to diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java index 12a67d301d..a151b4fce4 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.example.data.Group; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.api.WriteSupport; @@ -111,6 +112,11 @@ protected Builder self() { @Override protected WriteSupport getWriteSupport(Configuration conf) { + return getWriteSupport((ParquetConfiguration) null); + } + + @Override + protected WriteSupport getWriteSupport(ParquetConfiguration conf) { return new GroupWriteSupport(type, extraMetaData); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupReadSupport.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupReadSupport.java index c49b681d5c..6cb4b6f7fe 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupReadSupport.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupReadSupport.java @@ -22,6 +22,8 @@ import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.example.data.Group; import org.apache.parquet.example.data.simple.convert.GroupRecordConverter; import org.apache.parquet.hadoop.api.ReadSupport; @@ -34,6 +36,13 @@ public class GroupReadSupport extends ReadSupport { public org.apache.parquet.hadoop.api.ReadSupport.ReadContext init( Configuration configuration, Map keyValueMetaData, MessageType fileSchema) { + return init(new HadoopParquetConfiguration(configuration), keyValueMetaData, fileSchema); + } + + @Override + public org.apache.parquet.hadoop.api.ReadSupport.ReadContext init( + ParquetConfiguration configuration, Map keyValueMetaData, + MessageType fileSchema) { String partialSchemaString = configuration.get(ReadSupport.PARQUET_READ_SCHEMA); MessageType requestedProjection = getSchemaForRead(fileSchema, partialSchemaString); return new ReadContext(requestedProjection); @@ -46,4 +55,11 @@ public RecordMaterializer prepareForRead(Configuration configuration, return new GroupRecordConverter(readContext.getRequestedSchema()); } + @Override + public RecordMaterializer prepareForRead(ParquetConfiguration configuration, + Map keyValueMetaData, MessageType fileSchema, + org.apache.parquet.hadoop.api.ReadSupport.ReadContext readContext) { + return new GroupRecordConverter(readContext.getRequestedSchema()); + } + } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupWriteSupport.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupWriteSupport.java index dfed676c9c..a4d4a3f36d 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupWriteSupport.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupWriteSupport.java @@ -26,6 +26,8 @@ import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.example.data.Group; import org.apache.parquet.example.data.GroupWriter; import org.apache.parquet.hadoop.api.WriteSupport; @@ -41,6 +43,10 @@ public static void setSchema(MessageType schema, Configuration configuration) { } public static MessageType getSchema(Configuration configuration) { + return getSchema(new HadoopParquetConfiguration(configuration)); + } + + public static MessageType getSchema(ParquetConfiguration configuration) { return parseMessageType(Objects.requireNonNull(configuration.get(PARQUET_EXAMPLE_SCHEMA), PARQUET_EXAMPLE_SCHEMA)); } @@ -68,6 +74,11 @@ public String getName() { @Override public org.apache.parquet.hadoop.api.WriteSupport.WriteContext init(Configuration configuration) { + return init(new HadoopParquetConfiguration(configuration)); + } + + @Override + public org.apache.parquet.hadoop.api.WriteSupport.WriteContext init(ParquetConfiguration configuration) { // if present, prefer the schema passed to the constructor if (schema == null) { schema = getSchema(configuration); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ConfigurationUtil.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ConfigurationUtil.java index 7f39cd76c6..ca524d90b6 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ConfigurationUtil.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ConfigurationUtil.java @@ -19,18 +19,26 @@ package org.apache.parquet.hadoop.util; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.hadoop.BadConfigurationException; +import java.util.Map; + public class ConfigurationUtil { public static Class getClassFromConfig(Configuration configuration, String configName, Class assignableFrom) { + return getClassFromConfig(new HadoopParquetConfiguration(configuration), configName, assignableFrom); + } + + public static Class getClassFromConfig(ParquetConfiguration configuration, String configName, Class assignableFrom) { final String className = configuration.get(configName); if (className == null) { return null; } - + try { - final Class foundClass = configuration.getClassByName(className); + final Class foundClass = configuration.getClassByName(className); if (!assignableFrom.isAssignableFrom(foundClass)) { throw new BadConfigurationException("class " + className + " set in job conf at " + configName + " is not a subclass of " + assignableFrom.getCanonicalName()); @@ -41,4 +49,18 @@ public static Class getClassFromConfig(Configuration configuration, String co } } + public static Configuration createHadoopConfiguration(ParquetConfiguration conf) { + if (conf == null) { + return new Configuration(); + } + if (conf instanceof HadoopParquetConfiguration) { + return ((HadoopParquetConfiguration) conf).getConfiguration(); + } + Configuration configuration = new Configuration(); + for (Map.Entry entry : conf) { + configuration.set(entry.getKey(), entry.getValue()); + } + return configuration; + } + } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopCodecs.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopCodecs.java index a46c8db216..845cafc5aa 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopCodecs.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopCodecs.java @@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.compression.CompressionCodecFactory; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.hadoop.CodecFactory; public class HadoopCodecs { @@ -33,6 +34,10 @@ public static CompressionCodecFactory newFactory(Configuration conf, int sizeHin return new CodecFactory(conf, sizeHint); } + public static CompressionCodecFactory newFactory(ParquetConfiguration conf, int sizeHint) { + return new CodecFactory(conf, sizeHint); + } + public static CompressionCodecFactory newDirectFactory(Configuration conf, ByteBufferAllocator allocator, int sizeHint) { return CodecFactory.createDirectCodecFactory(conf, allocator, sizeHint); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/SerializationUtil.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/SerializationUtil.java index 199b774c43..6e669ed8ba 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/SerializationUtil.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/SerializationUtil.java @@ -29,6 +29,8 @@ import java.util.zip.GZIPOutputStream; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; /** * Serialization utils copied from: @@ -70,8 +72,22 @@ public static void writeObjectToConfAsBase64(String key, Object obj, Configurati * @return the read object, or null if key is not present in conf * @throws IOException if there is an error while reading */ - @SuppressWarnings("unchecked") public static T readObjectFromConfAsBase64(String key, Configuration conf) throws IOException { + return readObjectFromConfAsBase64(key, new HadoopParquetConfiguration(conf)); + } + + /** + * Reads an object (that was written using + * {@link #writeObjectToConfAsBase64}) from a configuration + * + * @param key for the configuration + * @param conf to read from + * @param the Java type of the deserialized object + * @return the read object, or null if key is not present in conf + * @throws IOException if there is an error while reading + */ + @SuppressWarnings("unchecked") + public static T readObjectFromConfAsBase64(String key, ParquetConfiguration conf) throws IOException { String b64 = conf.get(key); if (b64 == null) { return null; diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/DirectWriterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/DirectWriterTest.java index 074d2e8b66..e8e032c9cd 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/DirectWriterTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/DirectWriterTest.java @@ -25,6 +25,7 @@ import java.util.UUID; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.parquet.conf.ParquetConfiguration; import org.junit.Rule; import org.junit.rules.TemporaryFolder; import org.apache.parquet.hadoop.ParquetWriter; @@ -86,6 +87,11 @@ protected DirectWriteSupport(MessageType type, DirectWriter writer, @Override public WriteContext init(Configuration configuration) { + return init((ParquetConfiguration) null); + } + + @Override + public WriteContext init(ParquetConfiguration configuration) { return new WriteContext(type, metadata); } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/crypto/propertiesfactory/SchemaControlEncryptionTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/propertiesfactory/SchemaControlEncryptionTest.java index 862ae672c6..a7c2002832 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/crypto/propertiesfactory/SchemaControlEncryptionTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/propertiesfactory/SchemaControlEncryptionTest.java @@ -22,6 +22,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.crypto.EncryptionPropertiesFactory; import org.apache.parquet.crypto.ParquetCipher; import org.apache.parquet.example.data.Group; @@ -203,6 +205,11 @@ public CryptoGroupWriteSupport() { @Override public WriteContext init(Configuration conf) { + return init(new HadoopParquetConfiguration(conf)); + } + + @Override + public WriteContext init(ParquetConfiguration conf) { WriteContext writeContext = super.init(conf); MessageType schema = writeContext.getSchema(); List columns = schema.getColumns(); @@ -219,6 +226,10 @@ public WriteContext init(Configuration conf) { } private void setMetadata(ColumnDescriptor column, Configuration conf) { + setMetadata(column, new HadoopParquetConfiguration(conf)); + } + + private void setMetadata(ColumnDescriptor column, ParquetConfiguration conf) { String columnShortName = column.getPath()[column.getPath().length - 1]; if (cryptoMetadata.containsKey(columnShortName) && cryptoMetadata.get(columnShortName).get("columnKeyMetaData") != null) { @@ -242,6 +253,11 @@ protected Builder self() { @Override protected WriteSupport getWriteSupport(Configuration conf) { + return getWriteSupport((ParquetConfiguration) null); + } + + @Override + protected WriteSupport getWriteSupport(ParquetConfiguration conf) { return new CryptoGroupWriteSupport(); } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestEncryptionOptions.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestEncryptionOptions.java index a212c091fe..991d588838 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestEncryptionOptions.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestEncryptionOptions.java @@ -316,7 +316,7 @@ public void testWriteReadEncryptedParquetFiles() throws IOException { Path rootPath = new Path(temporaryFolder.getRoot().getPath()); LOG.info("======== testWriteReadEncryptedParquetFiles {} ========", rootPath.toString()); byte[] AADPrefix = AAD_PREFIX_STRING.getBytes(StandardCharsets.UTF_8); - // Write using various encryption configuraions + // Write using various encryption configurations testWriteEncryptedParquetFiles(rootPath, DATA); // Read using various decryption configurations. testReadEncryptedParquetFiles(rootPath, DATA); diff --git a/parquet-pig/src/main/java/org/apache/parquet/pig/TupleReadSupport.java b/parquet-pig/src/main/java/org/apache/parquet/pig/TupleReadSupport.java index 50f9ebcc3d..6ff68f3866 100644 --- a/parquet-pig/src/main/java/org/apache/parquet/pig/TupleReadSupport.java +++ b/parquet-pig/src/main/java/org/apache/parquet/pig/TupleReadSupport.java @@ -27,6 +27,8 @@ import java.util.Set; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.pig.LoadPushDown.RequiredFieldList; import org.apache.pig.data.Tuple; import org.apache.pig.impl.logicalLayer.FrontendException; @@ -61,6 +63,14 @@ public class TupleReadSupport extends ReadSupport { * @return the pig schema requested by the user or null if none. */ static Schema getPigSchema(Configuration configuration) { + return getPigSchema(new HadoopParquetConfiguration(configuration)); + } + + /** + * @param configuration the configuration + * @return the pig schema requested by the user or null if none. + */ + static Schema getPigSchema(ParquetConfiguration configuration) { return parsePigSchema(configuration.get(PARQUET_PIG_SCHEMA)); } @@ -69,9 +79,17 @@ static Schema getPigSchema(Configuration configuration) { * @return List of required fields from pushProjection */ static RequiredFieldList getRequiredFields(Configuration configuration) { + return getRequiredFields(new HadoopParquetConfiguration(configuration)); + } + + /** + * @param configuration configuration + * @return List of required fields from pushProjection + */ + static RequiredFieldList getRequiredFields(ParquetConfiguration configuration) { String requiredFieldString = configuration.get(PARQUET_PIG_REQUIRED_FIELDS); - if(requiredFieldString == null) { + if (requiredFieldString == null) { return null; } @@ -154,9 +172,9 @@ private static FieldSchema union(FieldSchema mergedFieldSchema, FieldSchema newF @Override public ReadContext init(InitContext initContext) { - Schema pigSchema = getPigSchema(initContext.getConfiguration()); - RequiredFieldList requiredFields = getRequiredFields(initContext.getConfiguration()); - boolean columnIndexAccess = initContext.getConfiguration().getBoolean(PARQUET_COLUMN_INDEX_ACCESS, false); + Schema pigSchema = getPigSchema(initContext.getConfig()); + RequiredFieldList requiredFields = getRequiredFields(initContext.getConfig()); + boolean columnIndexAccess = initContext.getConfig().getBoolean(PARQUET_COLUMN_INDEX_ACCESS, false); if (pigSchema == null) { return new ReadContext(initContext.getFileSchema()); @@ -174,9 +192,17 @@ public RecordMaterializer prepareForRead( Map keyValueMetaData, MessageType fileSchema, ReadContext readContext) { + return prepareForRead(new HadoopParquetConfiguration(configuration), keyValueMetaData, fileSchema, readContext); + } + + @Override + public RecordMaterializer prepareForRead( + ParquetConfiguration configuration, + Map keyValueMetaData, + MessageType fileSchema, + ReadContext readContext) { MessageType requestedSchema = readContext.getRequestedSchema(); Schema requestedPigSchema = getPigSchema(configuration); - if (requestedPigSchema == null) { throw new ParquetDecodingException("Missing Pig schema: ParquetLoader sets the schema in the job conf"); } diff --git a/parquet-pig/src/main/java/org/apache/parquet/pig/TupleWriteSupport.java b/parquet-pig/src/main/java/org/apache/parquet/pig/TupleWriteSupport.java index 68a7d7d22b..fd1bb39cdf 100644 --- a/parquet-pig/src/main/java/org/apache/parquet/pig/TupleWriteSupport.java +++ b/parquet-pig/src/main/java/org/apache/parquet/pig/TupleWriteSupport.java @@ -26,6 +26,8 @@ import java.util.Set; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataByteArray; @@ -82,6 +84,11 @@ public MessageType getParquetSchema() { @Override public WriteContext init(Configuration configuration) { + return init(new HadoopParquetConfiguration(configuration)); + } + + @Override + public WriteContext init(ParquetConfiguration configuration) { Map extraMetaData = new HashMap(); new PigMetaData(rootPigSchema).addToMetaData(extraMetaData); return new WriteContext(rootSchema, extraMetaData); diff --git a/parquet-pig/src/test/java/org/apache/parquet/pig/TestTupleRecordConsumer.java b/parquet-pig/src/test/java/org/apache/parquet/pig/TestTupleRecordConsumer.java index ff4bd87d64..1c21044cb0 100644 --- a/parquet-pig/src/test/java/org/apache/parquet/pig/TestTupleRecordConsumer.java +++ b/parquet-pig/src/test/java/org/apache/parquet/pig/TestTupleRecordConsumer.java @@ -33,6 +33,7 @@ import java.util.Set; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.Tuple; import org.apache.pig.impl.logicalLayer.schema.Schema; @@ -175,7 +176,7 @@ private void testFromGroups(String pigSchemaString, List input) throws Pa private TupleWriteSupport newTupleWriter(String pigSchemaString, RecordMaterializer recordConsumer) throws ParserException { TupleWriteSupport tupleWriter = TupleWriteSupport.fromPigSchema(pigSchemaString); - tupleWriter.init(null); + tupleWriter.init((ParquetConfiguration) null); tupleWriter.prepareForWrite( new ConverterConsumer(recordConsumer.getRootConverter(), tupleWriter.getParquetSchema()) ); diff --git a/parquet-pig/src/test/java/org/apache/parquet/pig/TupleConsumerPerfTest.java b/parquet-pig/src/test/java/org/apache/parquet/pig/TupleConsumerPerfTest.java index c8e36aded9..12c3373f5a 100644 --- a/parquet-pig/src/test/java/org/apache/parquet/pig/TupleConsumerPerfTest.java +++ b/parquet-pig/src/test/java/org/apache/parquet/pig/TupleConsumerPerfTest.java @@ -23,6 +23,7 @@ import java.util.logging.Level; import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.DataBag; import org.apache.pig.data.NonSpillableDataBag; @@ -130,8 +131,8 @@ private static void read(PageReadStore columns, String pigSchemaString, String m TupleReadSupport tupleReadSupport = new TupleReadSupport(); Map pigMetaData = pigMetaData(pigSchemaString); MessageType schema = new PigSchemaConverter().convert(Utils.getSchemaFromString(pigSchemaString)); - ReadContext init = tupleReadSupport.init(null, pigMetaData, schema); - RecordMaterializer recordConsumer = tupleReadSupport.prepareForRead(null, pigMetaData, schema, init); + ReadContext init = tupleReadSupport.init((ParquetConfiguration) null, pigMetaData, schema); + RecordMaterializer recordConsumer = tupleReadSupport.prepareForRead((ParquetConfiguration) null, pigMetaData, schema, init); RecordReader recordReader = columnIO.getRecordReader(columns, recordConsumer); // TODO: put this back // if (DEBUG) { @@ -156,7 +157,7 @@ private static Map pigMetaData(String pigSchemaString) { private static void write(MemPageStore memPageStore, ColumnWriteStoreV1 columns, MessageType schema, String pigSchemaString) throws ExecException, ParserException { MessageColumnIO columnIO = newColumnFactory(pigSchemaString); TupleWriteSupport tupleWriter = TupleWriteSupport.fromPigSchema(pigSchemaString); - tupleWriter.init(null); + tupleWriter.init((ParquetConfiguration) null); tupleWriter.prepareForWrite(columnIO.getRecordWriter(columns)); write(memPageStore, tupleWriter, 10000); write(memPageStore, tupleWriter, 10000); diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java index 8383fbc75f..da51788f2a 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java @@ -25,6 +25,8 @@ import com.twitter.elephantbird.util.Protobufs; import org.apache.hadoop.conf.Configuration; import org.apache.parquet.column.Dictionary; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.hadoop.BadConfigurationException; import org.apache.parquet.io.InvalidRecordException; import org.apache.parquet.io.ParquetDecodingException; @@ -68,7 +70,7 @@ public void add(Object value) { } }; - protected final Configuration conf; + protected final ParquetConfiguration conf; protected final Converter[] converters; protected final ParentValueContainer parent; protected final Message.Builder myBuilder; @@ -88,8 +90,16 @@ public void add(Object value) { this(conf, pvc, Protobufs.getMessageBuilder(protoClass), parquetSchema, extraMetadata); } + ProtoMessageConverter(ParquetConfiguration conf, ParentValueContainer pvc, Class protoClass, GroupType parquetSchema, Map extraMetadata) { + this(conf, pvc, Protobufs.getMessageBuilder(protoClass), parquetSchema, extraMetadata); + } + // For usage in message arrays ProtoMessageConverter(Configuration conf, ParentValueContainer pvc, Message.Builder builder, GroupType parquetSchema, Map extraMetadata) { + this(new HadoopParquetConfiguration(conf), pvc, builder, parquetSchema, extraMetadata); + } + + ProtoMessageConverter(ParquetConfiguration conf, ParentValueContainer pvc, Message.Builder builder, GroupType parquetSchema, Map extraMetadata) { if (pvc == null) { throw new IllegalStateException("Missing parent value container"); } @@ -141,7 +151,12 @@ private void validateProtoField(boolean ignoreUnknownFields, private Converter dummyScalarConverter(ParentValueContainer pvc, Type parquetField, Configuration conf, Map extraMetadata) { + return dummyScalarConverter(pvc, parquetField, new HadoopParquetConfiguration(conf), extraMetadata); + } + private Converter dummyScalarConverter(ParentValueContainer pvc, + Type parquetField, ParquetConfiguration conf, + Map extraMetadata) { if (parquetField.isPrimitive()) { PrimitiveType primitiveType = parquetField.asPrimitiveType(); PrimitiveType.PrimitiveTypeName primitiveTypeName = primitiveType.getPrimitiveTypeName(); diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetWriter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetWriter.java index a85b4ef555..f1bd64466e 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetWriter.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetWriter.java @@ -21,6 +21,7 @@ import com.google.protobuf.Message; import com.google.protobuf.MessageOrBuilder; import org.apache.hadoop.fs.Path; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -120,8 +121,15 @@ public Builder withMessage(Class protoMessage){ return this; } - protected WriteSupport getWriteSupport(Configuration conf) { - return (WriteSupport) ProtoParquetWriter.writeSupport(protoMessage); - } + @Override + protected WriteSupport getWriteSupport(Configuration conf) { + return getWriteSupport((ParquetConfiguration) null); + } + + @Override + @SuppressWarnings("unchecked") + protected WriteSupport getWriteSupport(ParquetConfiguration conf) { + return (WriteSupport) ProtoParquetWriter.writeSupport(protoMessage); + } } } diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java index 78edf70d2e..5a4568daae 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java @@ -21,6 +21,8 @@ import com.google.protobuf.Message; import com.twitter.elephantbird.util.Protobufs; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.hadoop.api.InitContext; import org.apache.parquet.hadoop.api.ReadSupport; import org.apache.parquet.io.api.RecordMaterializer; @@ -59,7 +61,7 @@ public static void setProtobufClass(Configuration configuration, String protobuf @Override public ReadContext init(InitContext context) { - String requestedProjectionString = context.getConfiguration().get(PB_REQUESTED_PROJECTION); + String requestedProjectionString = context.getConfig().get(PB_REQUESTED_PROJECTION); if (requestedProjectionString != null && !requestedProjectionString.trim().isEmpty()) { MessageType requestedProjection = getSchemaForRead(context.getFileSchema(), requestedProjectionString); @@ -74,6 +76,11 @@ public ReadContext init(InitContext context) { @Override public RecordMaterializer prepareForRead(Configuration configuration, Map keyValueMetaData, MessageType fileSchema, ReadContext readContext) { + return prepareForRead(new HadoopParquetConfiguration(configuration), keyValueMetaData, fileSchema, readContext); + } + + @Override + public RecordMaterializer prepareForRead(ParquetConfiguration configuration, Map keyValueMetaData, MessageType fileSchema, ReadContext readContext) { String headerProtoClass = keyValueMetaData.get(PB_CLASS); String configuredProtoClass = configuration.get(PB_CLASS); diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java index 75a67f12cf..4ddf23d6eb 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java @@ -22,6 +22,7 @@ import com.google.protobuf.Message; import com.google.protobuf.MessageOrBuilder; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.schema.MessageType; import java.util.Collections; @@ -54,6 +55,11 @@ public ProtoRecordConverter(Configuration conf, Class protocl reusedBuilder = getBuilder(); } + public ProtoRecordConverter(ParquetConfiguration conf, Class protoclass, MessageType parquetSchema, Map extraMetadata) { + super(conf, new SkipParentValueContainer(), protoclass, parquetSchema, extraMetadata); + reusedBuilder = getBuilder(); + } + public ProtoRecordConverter(Configuration conf, Message.Builder builder, MessageType parquetSchema, Map extraMetadata) { super(conf, new SkipParentValueContainer(), builder, parquetSchema, extraMetadata); reusedBuilder = getBuilder(); diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java index dd77ca6b61..63640d3300 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java @@ -21,6 +21,8 @@ import com.google.protobuf.Message; import com.google.protobuf.MessageOrBuilder; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.io.api.GroupConverter; import org.apache.parquet.io.api.RecordMaterializer; import org.apache.parquet.schema.MessageType; @@ -32,6 +34,10 @@ class ProtoRecordMaterializer extends RecordMaterial private final ProtoRecordConverter root; public ProtoRecordMaterializer(Configuration conf, MessageType requestedSchema, Class protobufClass, Map metadata) { + this(new HadoopParquetConfiguration(conf), requestedSchema, protobufClass, metadata); + } + + public ProtoRecordMaterializer(ParquetConfiguration conf, MessageType requestedSchema, Class protobufClass, Map metadata) { this.root = new ProtoRecordConverter(conf, protobufClass, requestedSchema, metadata); } diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java index c3570323f0..a6a779d074 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java @@ -25,6 +25,8 @@ import com.google.protobuf.Message; import com.twitter.elephantbird.util.Protobufs; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; @@ -81,9 +83,19 @@ public ProtoSchemaConverter(boolean parquetSpecsCompliant) { * Instantiate a schema converter to get the parquet schema corresponding to protobuf classes. * Returns instances that are not specs compliant and limited to 5 levels of recursion depth. * - * @param config Hadoop configuration object to parrse parquetSpecsCompliant and maxRecursion settings. + * @param config Hadoop configuration object to parse parquetSpecsCompliant and maxRecursion settings. */ public ProtoSchemaConverter(Configuration config) { + this(new HadoopParquetConfiguration(config)); + } + + /** + * Instantiate a schema converter to get the parquet schema corresponding to protobuf classes. + * Returns instances that are not specs compliant and limited to 5 levels of recursion depth. + * + * @param config Parquet configuration object to parse parquetSpecsCompliant and maxRecursion settings. + */ + public ProtoSchemaConverter(ParquetConfiguration config) { this( config.getBoolean(ProtoWriteSupport.PB_SPECS_COMPLIANT_WRITE, false), config.getInt(PB_MAX_RECURSION, 5)); diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java index f15511062a..b13acd2a57 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java @@ -23,6 +23,8 @@ import com.google.protobuf.Descriptors.FieldDescriptor; import com.twitter.elephantbird.util.Protobufs; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.hadoop.BadConfigurationException; import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.io.InvalidRecordException; @@ -118,6 +120,11 @@ public void prepareForWrite(RecordConsumer recordConsumer) { @Override public WriteContext init(Configuration configuration) { + return init(new HadoopParquetConfiguration(configuration)); + } + + @Override + public WriteContext init(ParquetConfiguration configuration) { Map extraMetaData = new HashMap<>(); diff --git a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/AbstractThriftWriteSupport.java b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/AbstractThriftWriteSupport.java index 0c3fe440d9..43337052e6 100644 --- a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/AbstractThriftWriteSupport.java +++ b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/AbstractThriftWriteSupport.java @@ -18,6 +18,8 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.thrift.TBase; import com.twitter.elephantbird.pig.util.ThriftToPig; @@ -40,14 +42,22 @@ public abstract class AbstractThriftWriteSupport extends WriteSupport { public static final String PARQUET_THRIFT_CLASS = "parquet.thrift.class"; private static final Logger LOG = LoggerFactory.getLogger(AbstractThriftWriteSupport.class); - private static Configuration conf; + private static ParquetConfiguration conf; public static void setGenericThriftClass(Configuration configuration, Class thriftClass) { + setGenericThriftClass(new HadoopParquetConfiguration(configuration), thriftClass); + } + + public static void setGenericThriftClass(ParquetConfiguration configuration, Class thriftClass) { conf = configuration; configuration.set(PARQUET_THRIFT_CLASS, thriftClass.getName()); } public static Class getGenericThriftClass(Configuration configuration) { + return getGenericThriftClass(new HadoopParquetConfiguration(configuration)); + } + + public static Class getGenericThriftClass(ParquetConfiguration configuration) { final String thriftClassName = configuration.get(PARQUET_THRIFT_CLASS); if (thriftClassName == null) { throw new BadConfigurationException("the thrift class conf is missing in job conf at " + PARQUET_THRIFT_CLASS); @@ -111,9 +121,14 @@ protected boolean isPigLoaded() { @Override public WriteContext init(Configuration configuration) { + return init(new HadoopParquetConfiguration(configuration)); + } + + @Override + public WriteContext init(ParquetConfiguration configuration) { conf = configuration; if (writeContext == null) { - init(getGenericThriftClass(configuration)); + init((Class) getGenericThriftClass(configuration)); } return writeContext; } diff --git a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/TBaseWriteSupport.java b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/TBaseWriteSupport.java index c1ece9fcfb..60dfc12e77 100644 --- a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/TBaseWriteSupport.java +++ b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/TBaseWriteSupport.java @@ -16,6 +16,8 @@ package org.apache.parquet.hadoop.thrift; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.thrift.TBase; import org.apache.thrift.TException; @@ -25,14 +27,22 @@ public class TBaseWriteSupport> extends AbstractThriftWriteSupport { - private static Configuration conf; + private static ParquetConfiguration conf; public static > void setThriftClass(Configuration configuration, Class thriftClass) { + setThriftClass(new HadoopParquetConfiguration(configuration), thriftClass); + } + + public static > void setThriftClass(ParquetConfiguration configuration, Class thriftClass) { conf = configuration; AbstractThriftWriteSupport.setGenericThriftClass(configuration, thriftClass); } public static Class> getThriftClass(Configuration configuration) { + return getThriftClass(new HadoopParquetConfiguration(configuration)); + } + + public static Class> getThriftClass(ParquetConfiguration configuration) { return (Class>)AbstractThriftWriteSupport.getGenericThriftClass(configuration); } diff --git a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftBytesWriteSupport.java b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftBytesWriteSupport.java index 6b9d75d98f..6c9311f4ec 100644 --- a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftBytesWriteSupport.java +++ b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftBytesWriteSupport.java @@ -24,6 +24,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.BytesWritable; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.thrift.TBase; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; @@ -57,6 +59,10 @@ public static void setTProtocolClass(Configuration conf, C } public static Class getTProtocolFactoryClass(Configuration conf) { + return getTProtocolFactoryClass(new HadoopParquetConfiguration(conf)); + } + + public static Class getTProtocolFactoryClass(ParquetConfiguration conf) { final String tProtocolClassName = conf.get(PARQUET_PROTOCOL_CLASS); if (tProtocolClassName == null) { throw new BadConfigurationException("the protocol class conf is missing in job conf at " + PARQUET_PROTOCOL_CLASS); @@ -80,7 +86,7 @@ public static Class getTProtocolFactoryClass(Configuration con private StructType thriftStruct; private ParquetWriteProtocol parquetWriteProtocol; private final FieldIgnoredHandler errorHandler; - private Configuration configuration; + private ParquetConfiguration configuration; public ThriftBytesWriteSupport() { this.buffered = true; @@ -106,6 +112,15 @@ public ThriftBytesWriteSupport( Class> thriftClass, boolean buffered, FieldIgnoredHandler errorHandler) { + this(new HadoopParquetConfiguration(configuration), protocolFactory, thriftClass, buffered, errorHandler); + } + + public ThriftBytesWriteSupport( + ParquetConfiguration configuration, + TProtocolFactory protocolFactory, + Class> thriftClass, + boolean buffered, + FieldIgnoredHandler errorHandler) { super(); this.configuration = configuration; this.protocolFactory = protocolFactory; @@ -124,6 +139,11 @@ public String getName() { @Override public WriteContext init(Configuration configuration) { + return init(new HadoopParquetConfiguration(configuration)); + } + + @Override + public WriteContext init(ParquetConfiguration configuration) { this.configuration = configuration; if (this.protocolFactory == null) { try { diff --git a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java index 2375a6df6c..30cf0986e1 100644 --- a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java +++ b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java @@ -25,6 +25,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.JobConf; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.thrift.TBase; import org.apache.thrift.protocol.TProtocol; @@ -111,6 +113,10 @@ public static void setStrictFieldProjectionFilter(Configuration conf, String sem } public static FieldProjectionFilter getFieldProjectionFilter(Configuration conf) { + return getFieldProjectionFilter(new HadoopParquetConfiguration(conf)); + } + + public static FieldProjectionFilter getFieldProjectionFilter(ParquetConfiguration conf) { String deprecated = conf.get(THRIFT_COLUMN_FILTER_KEY); String strict = conf.get(STRICT_THRIFT_COLUMN_FILTER_KEY); @@ -155,7 +161,7 @@ public ThriftReadSupport(Class thriftClass) { @Override public org.apache.parquet.hadoop.api.ReadSupport.ReadContext init(InitContext context) { - final Configuration configuration = context.getConfiguration(); + final ParquetConfiguration configuration = context.getConfig(); final MessageType fileMessageType = context.getFileSchema(); MessageType requestedProjection = fileMessageType; String partialSchemaString = configuration.get(ReadSupport.PARQUET_READ_SCHEMA); @@ -185,9 +191,14 @@ public org.apache.parquet.hadoop.api.ReadSupport.ReadContext init(InitContext co return new ReadContext(schemaForRead); } - @SuppressWarnings("unchecked") protected MessageType getProjectedSchema(Configuration configuration, FieldProjectionFilter fieldProjectionFilter) { + return getProjectedSchema(new HadoopParquetConfiguration(configuration), fieldProjectionFilter); + } + + @SuppressWarnings("unchecked") + protected MessageType getProjectedSchema(ParquetConfiguration configuration, FieldProjectionFilter + fieldProjectionFilter) { return new ThriftSchemaConverter(configuration, fieldProjectionFilter) .convert((Class>)thriftClass); } @@ -200,8 +211,12 @@ protected MessageType getProjectedSchema(FieldProjectionFilter .convert((Class>)thriftClass); } - @SuppressWarnings("unchecked") private void initThriftClassFromMultipleFiles(Map> fileMetadata, Configuration conf) throws ClassNotFoundException { + initThriftClassFromMultipleFiles(fileMetadata, new HadoopParquetConfiguration(conf)); + } + + @SuppressWarnings("unchecked") + private void initThriftClassFromMultipleFiles(Map> fileMetadata, ParquetConfiguration conf) throws ClassNotFoundException { if (thriftClass != null) { return; } @@ -216,8 +231,12 @@ private void initThriftClassFromMultipleFiles(Map> fileMetad thriftClass = (Class)Class.forName(className); } - @SuppressWarnings("unchecked") private void initThriftClass(ThriftMetaData metadata, Configuration conf) throws ClassNotFoundException { + initThriftClass(metadata, new HadoopParquetConfiguration(conf)); + } + + @SuppressWarnings("unchecked") + private void initThriftClass(ThriftMetaData metadata, ParquetConfiguration conf) throws ClassNotFoundException { if (thriftClass != null) { return; } @@ -254,13 +273,48 @@ public RecordMaterializer prepareForRead(Configuration configuration, configuration); } - @SuppressWarnings("unchecked") + @Override + public RecordMaterializer prepareForRead(ParquetConfiguration configuration, + Map keyValueMetaData, MessageType fileSchema, + org.apache.parquet.hadoop.api.ReadSupport.ReadContext readContext) { + ThriftMetaData thriftMetaData = ThriftMetaData.fromExtraMetaData(keyValueMetaData); + try { + initThriftClass(thriftMetaData, configuration); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Cannot find Thrift object class for metadata: " + thriftMetaData, e); + } + + // if there was not metadata in the file, get it from requested class + if (thriftMetaData == null) { + thriftMetaData = ThriftMetaData.fromThriftClass(thriftClass); + } + + String converterClassName = configuration.get(RECORD_CONVERTER_CLASS_KEY, RECORD_CONVERTER_DEFAULT); + return getRecordConverterInstance(converterClassName, thriftClass, + readContext.getRequestedSchema(), thriftMetaData.getDescriptor(), + configuration); + } + private static ThriftRecordConverter getRecordConverterInstance( String converterClassName, Class thriftClass, MessageType requestedSchema, StructType descriptor, Configuration conf) { - Class> converterClass; + return getRecordConverterInstance(converterClassName, thriftClass, requestedSchema, descriptor, conf, Configuration.class); + } + + private static ThriftRecordConverter getRecordConverterInstance( + String converterClassName, Class thriftClass, + MessageType requestedSchema, StructType descriptor, ParquetConfiguration conf) { + return getRecordConverterInstance(converterClassName, thriftClass, requestedSchema, descriptor, conf, ParquetConfiguration.class); + } + + @SuppressWarnings("unchecked") + private static ThriftRecordConverter getRecordConverterInstance( + String converterClassName, Class thriftClass, + MessageType requestedSchema, StructType descriptor, T2 conf, Class confClass) { + + Class> converterClass; try { - converterClass = (Class>) Class.forName(converterClassName); + converterClass = (Class>) Class.forName(converterClassName); } catch (ClassNotFoundException e) { throw new RuntimeException("Cannot find Thrift converter class: " + converterClassName, e); } @@ -268,15 +322,15 @@ private static ThriftRecordConverter getRecordConverterInstance( try { // first try the new version that accepts a Configuration try { - Constructor> constructor = - converterClass.getConstructor(Class.class, MessageType.class, StructType.class, Configuration.class); + Constructor> constructor = + converterClass.getConstructor(Class.class, MessageType.class, StructType.class, confClass); return constructor.newInstance(thriftClass, requestedSchema, descriptor, conf); } catch (IllegalAccessException | NoSuchMethodException e) { // try the other constructor pattern } - Constructor> constructor = - converterClass.getConstructor(Class.class, MessageType.class, StructType.class); + Constructor> constructor = + converterClass.getConstructor(Class.class, MessageType.class, StructType.class); return constructor.newInstance(thriftClass, requestedSchema, descriptor); } catch (InstantiationException | InvocationTargetException e) { throw new RuntimeException("Failed to construct Thrift converter class: " + converterClassName, e); diff --git a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftWriteSupport.java b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftWriteSupport.java index a9864ff814..2ac4fccf17 100644 --- a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftWriteSupport.java +++ b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftWriteSupport.java @@ -19,6 +19,7 @@ package org.apache.parquet.hadoop.thrift; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.thrift.TBase; import org.apache.parquet.hadoop.api.WriteSupport; @@ -68,6 +69,11 @@ public WriteContext init(Configuration configuration) { return this.writeSupport.init(configuration); } + @Override + public WriteContext init(ParquetConfiguration configuration) { + return this.writeSupport.init(configuration); + } + @Override public void prepareForWrite(RecordConsumer recordConsumer) { this.writeSupport.prepareForWrite(recordConsumer); diff --git a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetWriteProtocol.java b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetWriteProtocol.java index ba48b37794..f129f36b77 100644 --- a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetWriteProtocol.java +++ b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetWriteProtocol.java @@ -22,6 +22,8 @@ import java.nio.ByteBuffer; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.thrift.TException; import org.apache.thrift.protocol.TField; import org.apache.thrift.protocol.TList; @@ -500,6 +502,11 @@ private String toString(TMap map) { public ParquetWriteProtocol( Configuration configuration, RecordConsumer recordConsumer, MessageColumnIO schema, StructType thriftType) { + this(new HadoopParquetConfiguration(configuration), recordConsumer, schema, thriftType); + } + + public ParquetWriteProtocol( + ParquetConfiguration configuration, RecordConsumer recordConsumer, MessageColumnIO schema, StructType thriftType) { this.recordConsumer = recordConsumer; if (configuration != null) { this.writeThreeLevelList = configuration.getBoolean( diff --git a/parquet-thrift/src/main/java/org/apache/parquet/thrift/TBaseRecordConverter.java b/parquet-thrift/src/main/java/org/apache/parquet/thrift/TBaseRecordConverter.java index 78fc4a88f5..fa31a5c782 100644 --- a/parquet-thrift/src/main/java/org/apache/parquet/thrift/TBaseRecordConverter.java +++ b/parquet-thrift/src/main/java/org/apache/parquet/thrift/TBaseRecordConverter.java @@ -19,6 +19,8 @@ package org.apache.parquet.thrift; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.thrift.TBase; import org.apache.thrift.TException; import org.apache.thrift.protocol.TProtocol; @@ -38,10 +40,15 @@ public class TBaseRecordConverter> extends ThriftRecordConv */ @Deprecated public TBaseRecordConverter(final Class thriftClass, MessageType requestedParquetSchema, StructType thriftType) { - this(thriftClass, requestedParquetSchema, thriftType, null); + this(thriftClass, requestedParquetSchema, thriftType, (HadoopParquetConfiguration) null); } + @SuppressWarnings("unused") public TBaseRecordConverter(final Class thriftClass, MessageType requestedParquetSchema, StructType thriftType, Configuration conf) { + this(thriftClass, requestedParquetSchema, thriftType, new HadoopParquetConfiguration(conf)); + } + + public TBaseRecordConverter(final Class thriftClass, MessageType requestedParquetSchema, StructType thriftType, ParquetConfiguration conf) { super(new ThriftReader() { @Override public T readOneRecord(TProtocol protocol) throws TException { diff --git a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftRecordConverter.java b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftRecordConverter.java index 3244b32110..c92bf631b7 100644 --- a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftRecordConverter.java +++ b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftRecordConverter.java @@ -25,6 +25,8 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.thrift.TException; import org.apache.thrift.protocol.TField; import org.apache.thrift.protocol.TList; @@ -843,7 +845,7 @@ public void end() { */ @Deprecated public ThriftRecordConverter(ThriftReader thriftReader, String name, MessageType requestedParquetSchema, ThriftType.StructType thriftType) { - this(thriftReader, name, requestedParquetSchema, thriftType, null); + this(thriftReader, name, requestedParquetSchema, thriftType, (ParquetConfiguration) null); } /** @@ -855,6 +857,18 @@ public ThriftRecordConverter(ThriftReader thriftReader, String name, MessageT * @param conf a Configuration */ public ThriftRecordConverter(ThriftReader thriftReader, String name, MessageType requestedParquetSchema, ThriftType.StructType thriftType, Configuration conf) { + this(thriftReader, name, requestedParquetSchema, thriftType, new HadoopParquetConfiguration(conf)); + } + + /** + * + * @param thriftReader the class responsible for instantiating the final object and read from the protocol + * @param name the name of that type ( the thrift class simple name) + * @param requestedParquetSchema the schema for the incoming columnar events + * @param thriftType the thrift type descriptor + * @param conf a Configuration + */ + public ThriftRecordConverter(ThriftReader thriftReader, String name, MessageType requestedParquetSchema, ThriftType.StructType thriftType, ParquetConfiguration conf) { super(); this.thriftReader = thriftReader; this.protocol = new ParquetReadProtocol(); diff --git a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConvertVisitor.java b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConvertVisitor.java index f915a6ed11..c32df81477 100644 --- a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConvertVisitor.java +++ b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConvertVisitor.java @@ -24,6 +24,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.parquet.ShouldNeverHappenException; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; @@ -80,6 +82,11 @@ class ThriftSchemaConvertVisitor implements ThriftType.StateVisitor clazz = configuration.getClassByName(className).asSubclass(TBase.class); thriftWriteSupport = new ThriftWriteSupport(clazz); diff --git a/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetWriteProtocol.java b/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetWriteProtocol.java index 1311d76904..98f22d12a0 100644 --- a/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetWriteProtocol.java +++ b/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetWriteProtocol.java @@ -33,6 +33,7 @@ import com.twitter.elephantbird.thrift.test.TestMapInList; import com.twitter.elephantbird.thrift.test.TestNameSet; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.ParquetConfiguration; import org.junit.ComparisonFailure; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -708,7 +709,7 @@ private MessageType validatePig(String[] expectations, TBase a) { MessageType schema = new PigSchemaConverter().convert(pigSchema); LOG.info("{}", schema); TupleWriteSupport tupleWriteSupport = new TupleWriteSupport(pigSchema); - tupleWriteSupport.init(null); + tupleWriteSupport.init((ParquetConfiguration) null); tupleWriteSupport.prepareForWrite(recordConsumer); final Tuple pigTuple = thriftToPig.getPigTuple(a); LOG.info("{}", pigTuple); diff --git a/pom.xml b/pom.xml index 1fd4b06462..32df2b2c88 100644 --- a/pom.xml +++ b/pom.xml @@ -547,6 +547,8 @@ ${shade.prefix} + org.apache.parquet.hadoop.CodecFactory + org.apache.parquet.hadoop.ParquetReader org.apache.parquet.thrift.projection.deprecated.PathGlobPattern From bf43bbda6d68b9417ecbee83a438ae14e43a81e8 Mon Sep 17 00:00:00 2001 From: Atour <28668597+amousavigourabi@users.noreply.github.com> Date: Sun, 24 Sep 2023 18:25:25 +0200 Subject: [PATCH 2/6] Update parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/InitContext.java Co-authored-by: Gang Wu --- .../main/java/org/apache/parquet/hadoop/api/InitContext.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/InitContext.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/InitContext.java index b2e36c842a..5ef453925b 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/InitContext.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/InitContext.java @@ -90,7 +90,7 @@ public Configuration getConfiguration() { return ConfigurationUtil.createHadoopConfiguration(configuration); } - public ParquetConfiguration getConfig() { + public ParquetConfiguration getParquetConfiguration() { return configuration; } From c5cd17ccec357344948203070e48f9f7aee5ece4 Mon Sep 17 00:00:00 2001 From: amousavigourabi <28668597+amousavigourabi@users.noreply.github.com> Date: Sun, 24 Sep 2023 18:26:18 +0200 Subject: [PATCH 3/6] Address comments --- .../apache/parquet/hadoop/api/InitContext.java | 3 +++ .../apache/parquet/hadoop/api/ReadSupport.java | 2 +- .../org/apache/parquet/pig/TupleReadSupport.java | 6 +++--- .../apache/parquet/proto/ProtoReadSupport.java | 2 +- .../thrift/AbstractThriftWriteSupport.java | 2 +- .../parquet/hadoop/thrift/ThriftReadSupport.java | 16 ++++++++-------- 6 files changed, 17 insertions(+), 14 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/InitContext.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/InitContext.java index 5ef453925b..3d80811aa7 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/InitContext.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/InitContext.java @@ -90,6 +90,9 @@ public Configuration getConfiguration() { return ConfigurationUtil.createHadoopConfiguration(configuration); } + /** + * @return the Parquet configuration for this job + */ public ParquetConfiguration getParquetConfiguration() { return configuration; } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/ReadSupport.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/ReadSupport.java index 5908bf6ca3..978d5a61f5 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/ReadSupport.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/ReadSupport.java @@ -101,7 +101,7 @@ public ReadContext init( * @return the readContext that defines how to read the file */ public ReadContext init(InitContext context) { - return init(context.getConfig(), context.getMergedKeyValueMetaData(), context.getFileSchema()); + return init(context.getParquetConfiguration(), context.getMergedKeyValueMetaData(), context.getFileSchema()); } /** diff --git a/parquet-pig/src/main/java/org/apache/parquet/pig/TupleReadSupport.java b/parquet-pig/src/main/java/org/apache/parquet/pig/TupleReadSupport.java index 6ff68f3866..cca1a91f2d 100644 --- a/parquet-pig/src/main/java/org/apache/parquet/pig/TupleReadSupport.java +++ b/parquet-pig/src/main/java/org/apache/parquet/pig/TupleReadSupport.java @@ -172,9 +172,9 @@ private static FieldSchema union(FieldSchema mergedFieldSchema, FieldSchema newF @Override public ReadContext init(InitContext initContext) { - Schema pigSchema = getPigSchema(initContext.getConfig()); - RequiredFieldList requiredFields = getRequiredFields(initContext.getConfig()); - boolean columnIndexAccess = initContext.getConfig().getBoolean(PARQUET_COLUMN_INDEX_ACCESS, false); + Schema pigSchema = getPigSchema(initContext.getParquetConfiguration()); + RequiredFieldList requiredFields = getRequiredFields(initContext.getParquetConfiguration()); + boolean columnIndexAccess = initContext.getParquetConfiguration().getBoolean(PARQUET_COLUMN_INDEX_ACCESS, false); if (pigSchema == null) { return new ReadContext(initContext.getFileSchema()); diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java index 5a4568daae..6343992e58 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java @@ -61,7 +61,7 @@ public static void setProtobufClass(Configuration configuration, String protobuf @Override public ReadContext init(InitContext context) { - String requestedProjectionString = context.getConfig().get(PB_REQUESTED_PROJECTION); + String requestedProjectionString = context.getParquetConfiguration().get(PB_REQUESTED_PROJECTION); if (requestedProjectionString != null && !requestedProjectionString.trim().isEmpty()) { MessageType requestedProjection = getSchemaForRead(context.getFileSchema(), requestedProjectionString); diff --git a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/AbstractThriftWriteSupport.java b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/AbstractThriftWriteSupport.java index 43337052e6..baa3ddb92f 100644 --- a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/AbstractThriftWriteSupport.java +++ b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/AbstractThriftWriteSupport.java @@ -53,7 +53,7 @@ public static void setGenericThriftClass(ParquetConfiguration configuration, Cla configuration.set(PARQUET_THRIFT_CLASS, thriftClass.getName()); } - public static Class getGenericThriftClass(Configuration configuration) { + public static Class getGenericThriftClass(Configuration configuration) { return getGenericThriftClass(new HadoopParquetConfiguration(configuration)); } diff --git a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java index 30cf0986e1..bd9530b82b 100644 --- a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java +++ b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java @@ -161,7 +161,7 @@ public ThriftReadSupport(Class thriftClass) { @Override public org.apache.parquet.hadoop.api.ReadSupport.ReadContext init(InitContext context) { - final ParquetConfiguration configuration = context.getConfig(); + final ParquetConfiguration configuration = context.getParquetConfiguration(); final MessageType fileMessageType = context.getFileSchema(); MessageType requestedProjection = fileMessageType; String partialSchemaString = configuration.get(ReadSupport.PARQUET_READ_SCHEMA); @@ -308,13 +308,13 @@ private static ThriftRecordConverter getRecordConverterInstance( } @SuppressWarnings("unchecked") - private static ThriftRecordConverter getRecordConverterInstance( - String converterClassName, Class thriftClass, - MessageType requestedSchema, StructType descriptor, T2 conf, Class confClass) { + private static ThriftRecordConverter getRecordConverterInstance( + String converterClassName, Class thriftClass, + MessageType requestedSchema, StructType descriptor, CONF conf, Class confClass) { - Class> converterClass; + Class> converterClass; try { - converterClass = (Class>) Class.forName(converterClassName); + converterClass = (Class>) Class.forName(converterClassName); } catch (ClassNotFoundException e) { throw new RuntimeException("Cannot find Thrift converter class: " + converterClassName, e); } @@ -322,14 +322,14 @@ private static ThriftRecordConverter getRecordConverterInstance( try { // first try the new version that accepts a Configuration try { - Constructor> constructor = + Constructor> constructor = converterClass.getConstructor(Class.class, MessageType.class, StructType.class, confClass); return constructor.newInstance(thriftClass, requestedSchema, descriptor, conf); } catch (IllegalAccessException | NoSuchMethodException e) { // try the other constructor pattern } - Constructor> constructor = + Constructor> constructor = converterClass.getConstructor(Class.class, MessageType.class, StructType.class); return constructor.newInstance(thriftClass, requestedSchema, descriptor); } catch (InstantiationException | InvocationTargetException e) { From 3315ba822c8daa84bdc6b643b726c1a5ddac2161 Mon Sep 17 00:00:00 2001 From: amousavigourabi <28668597+amousavigourabi@users.noreply.github.com> Date: Sun, 24 Sep 2023 18:29:39 +0200 Subject: [PATCH 4/6] Add comments at compatibility exclusions --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 32df2b2c88..b59a2c259b 100644 --- a/pom.xml +++ b/pom.xml @@ -547,8 +547,8 @@ ${shade.prefix} - org.apache.parquet.hadoop.CodecFactory - org.apache.parquet.hadoop.ParquetReader + org.apache.parquet.hadoop.CodecFactory + org.apache.parquet.hadoop.ParquetReader org.apache.parquet.thrift.projection.deprecated.PathGlobPattern From 8ff97ea1fe197caf40f4338119a8dd372c8ac862 Mon Sep 17 00:00:00 2001 From: amousavigourabi <28668597+amousavigourabi@users.noreply.github.com> Date: Sun, 24 Sep 2023 21:42:26 +0200 Subject: [PATCH 5/6] Remove unnecessary Configuration --- .../org/apache/parquet/hadoop/InternalParquetRecordReader.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java index d94d5d2ef4..0085972615 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java @@ -173,9 +173,6 @@ public void initialize(ParquetFileReader reader, ParquetReadOptions options) { for (String property : options.getPropertyNames()) { conf.set(property, options.getProperty(property)); } - for (Map.Entry property : new Configuration()) { - conf.set(property.getKey(), property.getValue()); - } // initialize a ReadContext for this file this.reader = reader; From 24bf290d5705583f750c9b0c3607b9707fd9376b Mon Sep 17 00:00:00 2001 From: amousavigourabi <28668597+amousavigourabi@users.noreply.github.com> Date: Fri, 13 Oct 2023 17:48:24 +0200 Subject: [PATCH 6/6] Review changes --- .../parquet/avro/AvroParquetOutputFormat.java | 2 - .../apache/parquet/avro/AvroReadSupport.java | 4 - .../apache/parquet/avro/AvroWriteSupport.java | 4 - .../parquet/conf/ParquetConfiguration.java | 118 +++++++++++++++++- .../apache/parquet/ParquetReadOptions.java | 5 +- .../org/apache/parquet/hadoop/DirectZstd.java | 4 - .../hadoop/InternalParquetRecordReader.java | 1 - .../parquet/hadoop/ParquetInputFormat.java | 18 --- .../apache/parquet/hadoop/ParquetWriter.java | 22 ++-- .../parquet/hadoop/api/ReadSupport.java | 12 +- .../parquet/hadoop/api/WriteSupport.java | 2 +- .../parquet/thrift/ThriftRecordConverter.java | 1 - 12 files changed, 137 insertions(+), 56 deletions(-) diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetOutputFormat.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetOutputFormat.java index afbaefcb00..9195925f7e 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetOutputFormat.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetOutputFormat.java @@ -19,9 +19,7 @@ package org.apache.parquet.avro; import org.apache.avro.Schema; -import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.mapreduce.Job; -import org.apache.parquet.avro.AvroWriteSupport; import org.apache.parquet.hadoop.ParquetOutputFormat; import org.apache.parquet.hadoop.util.ContextUtil; diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java index 9b165548e5..0bda3d02ed 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java @@ -170,10 +170,6 @@ private static RecordMaterializer newCompatMaterializer( parquetSchema, avroSchema, model); } - private GenericData getDataModel(Configuration conf, Schema schema) { - return getDataModel(new HadoopParquetConfiguration(conf), schema); - } - private GenericData getDataModel(ParquetConfiguration conf, Schema schema) { if (model != null) { return model; diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java index 401886e72e..692e3fac0f 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java @@ -412,10 +412,6 @@ private Binary fromAvroString(Object value) { return Binary.fromCharSequence(value.toString()); } - private static GenericData getDataModel(Configuration conf, Schema schema) { - return getDataModel(new HadoopParquetConfiguration(conf), schema); - } - private static GenericData getDataModel(ParquetConfiguration conf, Schema schema) { if (conf.get(AVRO_DATA_SUPPLIER) == null && schema != null) { GenericData modelForSchema; diff --git a/parquet-common/src/main/java/org/apache/parquet/conf/ParquetConfiguration.java b/parquet-common/src/main/java/org/apache/parquet/conf/ParquetConfiguration.java index 8e7c80d37e..f8aae97297 100644 --- a/parquet-common/src/main/java/org/apache/parquet/conf/ParquetConfiguration.java +++ b/parquet-common/src/main/java/org/apache/parquet/conf/ParquetConfiguration.java @@ -22,41 +22,157 @@ import java.util.Map; /** - * Configuration interface with the methods necessary to configure for Parquet applications. + * Configuration interface with the methods necessary to configure Parquet applications. */ public interface ParquetConfiguration extends Iterable> { + /** + * Sets the value of the name property. + * + * @param name the property to set + * @param value the value to set the property to + */ void set(String name, String value); + /** + * Sets the value of the name property to a long. + * + * @param name the property to set + * @param value the value to set the property to + */ void setLong(String name, long value); + /** + * Sets the value of the name property to an integer. + * + * @param name the property to set + * @param value the value to set the property to + */ void setInt(String name, int value); + /** + * Sets the value of the name property to a boolean. + * + * @param name the property to set + * @param value the value to set the property to + */ void setBoolean(String name, boolean value); + /** + * Sets the value of the name property to an array of comma delimited values. + * + * @param name the property to set + * @param value the values to set the property to + */ void setStrings(String name, String... value); + /** + * Sets the value of the name property to a class. + * + * @param name the property to set + * @param value the value to set the property to + * @param xface the interface implemented by the value + */ void setClass(String name, Class value, Class xface); + /** + * Gets the value of the name property. Returns null if no such value exists. + * + * @param name the property to retrieve the value of + * @return the value of the property, or null if it does not exist + */ String get(String name); + /** + * Gets the value of the name property. Returns the default value if no such value exists. + * + * @param name the property to retrieve the value of + * @param defaultValue the default return if no value is set for the property + * @return the value of the property, or the default value if it does not exist + */ String get(String name, String defaultValue); + /** + * Gets the value of the name property as a long. Returns the default value if no such value exists. + * + * @param name the property to retrieve the value of + * @param defaultValue the default return if no value is set for the property + * @return the value of the property as a long, or the default value if it does not exist + */ long getLong(String name, long defaultValue); + /** + * Gets the value of the name property as an integer. Returns the default value if no such value exists. + * + * @param name the property to retrieve the value of + * @param defaultValue the default return if no value is set for the property + * @return the value of the property as an integer, or the default value if it does not exist + */ int getInt(String name, int defaultValue); + /** + * Gets the value of the name property as a boolean. Returns the default value if no such value exists. + * + * @param name the property to retrieve the value of + * @param defaultValue the default return if no value is set for the property + * @return the value of the property as a boolean, or the default value if it does not exist + */ boolean getBoolean(String name, boolean defaultValue); + /** + * Gets the trimmed value of the name property. Returns null if no such value exists. + * + * @param name the property to retrieve the value of + * @return the trimmed value of the property, or null if it does not exist + */ String getTrimmed(String name); + /** + * Gets the trimmed value of the name property as a boolean. + * Returns the default value if no such value exists. + * + * @param name the property to retrieve the value of + * @param defaultValue the default return if no value is set for the property + * @return the trimmed value of the property, or the default value if it does not exist + */ String getTrimmed(String name, String defaultValue); + /** + * Gets the value of the name property as an array of {@link String}s. + * Returns the default value if no such value exists. + * Interprets the stored value as a comma delimited array. + * + * @param name the property to retrieve the value of + * @param defaultValue the default return if no value is set for the property + * @return the value of the property as an array, or the default value if it does not exist + */ String[] getStrings(String name, String[] defaultValue); + /** + * Gets the value of the name property as a class. Returns the default value if no such value exists. + * + * @param name the property to retrieve the value of + * @param defaultValue the default return if no value is set for the property + * @return the value of the property as a class, or the default value if it does not exist + */ Class getClass(String name, Class defaultValue); + /** + * Gets the value of the name property as a class implementing the xface interface. + * Returns the default value if no such value exists. + * + * @param name the property to retrieve the value of + * @param defaultValue the default return if no value is set for the property + * @return the value of the property as a class, or the default value if it does not exist + */ Class getClass(String name, Class defaultValue, Class xface); + /** + * Load a class by name. + * + * @param name the name of the {@link Class} to load + * @return the loaded class + * @throws ClassNotFoundException when the specified class cannot be found + */ Class getClassByName(String name) throws ClassNotFoundException; } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java index 28e90522e4..8e93dc4ad5 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java @@ -19,7 +19,6 @@ package org.apache.parquet; -import org.apache.hadoop.conf.Configuration; import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.HeapByteBufferAllocator; import org.apache.parquet.compression.CompressionCodecFactory; @@ -208,7 +207,7 @@ public ParquetConfiguration getConfiguration() { } public static Builder builder() { - return new Builder(new HadoopParquetConfiguration()); + return new Builder(); } public static Builder builder(ParquetConfiguration conf) { @@ -235,7 +234,7 @@ public static class Builder { protected ParquetConfiguration conf; public Builder() { - conf = new HadoopParquetConfiguration(); + this(new HadoopParquetConfiguration()); } public Builder(ParquetConfiguration conf) { diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectZstd.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectZstd.java index 620ddf1cd6..588f93c892 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectZstd.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectZstd.java @@ -143,10 +143,6 @@ BytesInput getBytesInput() { } } - private static BufferPool getPool(Configuration conf) { - return getPool(new HadoopParquetConfiguration(conf)); - } - private static BufferPool getPool(ParquetConfiguration conf) { if (conf.getBoolean(PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED, DEFAULT_PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED)) { return RecyclingBufferPool.INSTANCE; diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java index 0085972615..36da819fa7 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java @@ -30,7 +30,6 @@ import org.apache.hadoop.conf.Configuration; -import org.apache.parquet.HadoopReadOptions; import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.conf.ParquetConfiguration; diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java index 3e96535794..7d355af78c 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java @@ -195,24 +195,6 @@ public static Class getUnboundRecordFilter(Configuration configuration) { return ConfigurationUtil.getClassFromConfig(configuration, UNBOUND_RECORD_FILTER, UnboundRecordFilter.class); } - private static UnboundRecordFilter getUnboundRecordFilterInstance(Configuration configuration) { - Class clazz = ConfigurationUtil.getClassFromConfig(configuration, UNBOUND_RECORD_FILTER, UnboundRecordFilter.class); - if (clazz == null) { return null; } - - try { - UnboundRecordFilter unboundRecordFilter = (UnboundRecordFilter) clazz.newInstance(); - - if (unboundRecordFilter instanceof Configurable) { - ((Configurable)unboundRecordFilter).setConf(configuration); - } - - return unboundRecordFilter; - } catch (InstantiationException | IllegalAccessException e) { - throw new BadConfigurationException( - "could not instantiate unbound record filter class", e); - } - } - private static UnboundRecordFilter getUnboundRecordFilterInstance(ParquetConfiguration configuration) { Class clazz = ConfigurationUtil.getClassFromConfig(configuration, UNBOUND_RECORD_FILTER, UnboundRecordFilter.class); if (clazz == null) { diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java index 94aabb88fd..2e888722e3 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java @@ -293,16 +293,16 @@ public ParquetWriter(Path file, Configuration conf, WriteSupport writeSupport } ParquetWriter( - OutputFile file, - ParquetFileWriter.Mode mode, - WriteSupport writeSupport, - CompressionCodecName compressionCodecName, - long rowGroupSize, - boolean validating, - ParquetConfiguration conf, - int maxPaddingSize, - ParquetProperties encodingProps, - FileEncryptionProperties encryptionProperties) throws IOException { + OutputFile file, + ParquetFileWriter.Mode mode, + WriteSupport writeSupport, + CompressionCodecName compressionCodecName, + long rowGroupSize, + boolean validating, + ParquetConfiguration conf, + int maxPaddingSize, + ParquetProperties encodingProps, + FileEncryptionProperties encryptionProperties) throws IOException { WriteSupport.WriteContext writeContext = writeSupport.init(conf); MessageType schema = writeContext.getSchema(); @@ -414,7 +414,7 @@ protected Builder(OutputFile path) { * @return an appropriate WriteSupport for the object model. */ protected WriteSupport getWriteSupport(ParquetConfiguration conf) { - throw new UnsupportedOperationException("Override getWriteSupport(ParquetConfiguration)"); + throw new UnsupportedOperationException("Override ParquetWriter$Builder#getWriteSupport(ParquetConfiguration)"); } /** diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/ReadSupport.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/ReadSupport.java index 978d5a61f5..a3dfe2caa2 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/ReadSupport.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/ReadSupport.java @@ -70,10 +70,10 @@ public static MessageType getSchemaForRead(MessageType fileMessageType, MessageT */ @Deprecated public ReadContext init( - Configuration configuration, - Map keyValueMetaData, - MessageType fileSchema) { - throw new UnsupportedOperationException("Override init(InitContext)"); + Configuration configuration, + Map keyValueMetaData, + MessageType fileSchema) { + throw new UnsupportedOperationException("Override ReadSupport.init(InitContext)"); } /** @@ -91,7 +91,7 @@ public ReadContext init( ParquetConfiguration configuration, Map keyValueMetaData, MessageType fileSchema) { - throw new UnsupportedOperationException("Override init(InitContext)"); + throw new UnsupportedOperationException("Override ReadSupport.init(InitContext)"); } /** @@ -135,7 +135,7 @@ public RecordMaterializer prepareForRead( Map keyValueMetaData, MessageType fileSchema, ReadContext readContext) { - throw new UnsupportedOperationException("Override prepareForRead(ParquetConfiguration, Map, MessageType, ReadContext)"); + throw new UnsupportedOperationException("Override ReadSupport.prepareForRead(ParquetConfiguration, Map, MessageType, ReadContext)"); } /** diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/WriteSupport.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/WriteSupport.java index a128d29179..b73e102c20 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/WriteSupport.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/WriteSupport.java @@ -112,7 +112,7 @@ public Map getExtraMetaData() { * @return the information needed to write the file */ public WriteContext init(ParquetConfiguration configuration) { - throw new UnsupportedOperationException("Override init(ParquetConfiguration)"); + throw new UnsupportedOperationException("Override WriteSupport#init(ParquetConfiguration)"); } /** diff --git a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftRecordConverter.java b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftRecordConverter.java index c92bf631b7..d0649212fb 100644 --- a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftRecordConverter.java +++ b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftRecordConverter.java @@ -861,7 +861,6 @@ public ThriftRecordConverter(ThriftReader thriftReader, String name, MessageT } /** - * * @param thriftReader the class responsible for instantiating the final object and read from the protocol * @param name the name of that type ( the thrift class simple name) * @param requestedParquetSchema the schema for the incoming columnar events