Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PARQUET-2347: Add interface layer between Parquet and Hadoop Configuration #1141

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@
package org.apache.parquet.avro;

import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.mapreduce.Job;
import org.apache.parquet.avro.AvroWriteSupport;
import org.apache.parquet.hadoop.ParquetOutputFormat;
import org.apache.parquet.hadoop.util.ContextUtil;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.filter.UnboundRecordFilter;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.api.ReadSupport;
Expand Down Expand Up @@ -53,6 +54,10 @@ public static <T> Builder<T> builder(InputFile file) {
return new Builder<T>(file);
}

public static <T> Builder<T> builder(InputFile file, ParquetConfiguration conf) {
return new Builder<T>(file, conf);
}

/**
* Convenience method for creating a ParquetReader which uses Avro
* {@link GenericData} objects to store data from reads.
Expand All @@ -67,6 +72,21 @@ public static ParquetReader<GenericRecord> genericRecordReader(InputFile file) t
return new Builder<GenericRecord>(file).withDataModel(GenericData.get()).build();
}

/**
* Convenience method for creating a ParquetReader which uses Avro
* {@link GenericData} objects to store data from reads.
*
* @param file The location to read data from
* @param conf The configuration to use
* @return A {@code ParquetReader} which reads data as Avro
* {@code GenericData}
* @throws IOException if the InputFile has been closed, or if some other I/O
* error occurs
*/
public static ParquetReader<GenericRecord> genericRecordReader(InputFile file, ParquetConfiguration conf) throws IOException {
return new Builder<GenericRecord>(file, conf).withDataModel(GenericData.get()).build();
}

/**
* Convenience method for creating a ParquetReader which uses Avro
* {@link GenericData} objects to store data from reads.
Expand Down Expand Up @@ -143,6 +163,10 @@ private Builder(InputFile file) {
super(file);
}

private Builder(InputFile file, ParquetConfiguration conf) {
super(file, conf);
}

public Builder<T> withDataModel(GenericData model) {
this.model = model;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ParquetProperties.WriterVersion;
import org.apache.parquet.conf.HadoopParquetConfiguration;
import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
Expand Down Expand Up @@ -154,6 +156,12 @@ private static <T> WriteSupport<T> writeSupport(Schema avroSchema,
private static <T> WriteSupport<T> writeSupport(Configuration conf,
Schema avroSchema,
GenericData model) {
return writeSupport(new HadoopParquetConfiguration(conf), avroSchema, model);
}

private static <T> WriteSupport<T> writeSupport(ParquetConfiguration conf,
Schema avroSchema,
GenericData model) {
return new AvroWriteSupport<T>(
new AvroSchemaConverter(conf).convert(avroSchema), avroSchema, model);
}
Expand Down Expand Up @@ -189,5 +197,10 @@ protected Builder<T> self() {
protected WriteSupport<T> getWriteSupport(Configuration conf) {
return AvroParquetWriter.writeSupport(conf, schema, model);
}

@Override
protected WriteSupport<T> getWriteSupport(ParquetConfiguration conf) {
return AvroParquetWriter.writeSupport(conf, schema, model);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
import org.apache.avro.generic.GenericData;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.parquet.conf.HadoopParquetConfiguration;
import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.hadoop.api.ReadSupport;
import org.apache.parquet.hadoop.util.ConfigurationUtil;
import org.apache.parquet.io.api.RecordMaterializer;
import org.apache.parquet.schema.MessageType;
import org.slf4j.Logger;
Expand Down Expand Up @@ -95,6 +98,13 @@ public AvroReadSupport(GenericData model) {
public ReadContext init(Configuration configuration,
Map<String, String> keyValueMetaData,
MessageType fileSchema) {
return init(new HadoopParquetConfiguration(configuration), keyValueMetaData, fileSchema);
}

@Override
public ReadContext init(ParquetConfiguration configuration,
Map<String, String> keyValueMetaData,
MessageType fileSchema) {
MessageType projection = fileSchema;
Map<String, String> metadata = new LinkedHashMap<String, String>();

Expand All @@ -120,6 +130,13 @@ public ReadContext init(Configuration configuration,
public RecordMaterializer<T> prepareForRead(
Configuration configuration, Map<String, String> keyValueMetaData,
MessageType fileSchema, ReadContext readContext) {
return prepareForRead(new HadoopParquetConfiguration(configuration), keyValueMetaData, fileSchema, readContext);
}

@Override
public RecordMaterializer<T> prepareForRead(
ParquetConfiguration configuration, Map<String, String> keyValueMetaData,
MessageType fileSchema, ReadContext readContext) {
Map<String, String> metadata = readContext.getReadSupportMetadata();
MessageType parquetSchema = readContext.getRequestedSchema();
Schema avroSchema;
Expand Down Expand Up @@ -153,7 +170,7 @@ private static <T> RecordMaterializer<T> newCompatMaterializer(
parquetSchema, avroSchema, model);
}

private GenericData getDataModel(Configuration conf, Schema schema) {
private GenericData getDataModel(ParquetConfiguration conf, Schema schema) {
amousavigourabi marked this conversation as resolved.
Show resolved Hide resolved
if (model != null) {
return model;
}
Expand All @@ -175,6 +192,6 @@ private GenericData getDataModel(Configuration conf, Schema schema) {

Class<? extends AvroDataSupplier> suppClass = conf.getClass(
AVRO_DATA_SUPPLIER, SpecificDataSupplier.class, AvroDataSupplier.class);
return ReflectionUtils.newInstance(suppClass, conf).get();
return ReflectionUtils.newInstance(suppClass, ConfigurationUtil.createHadoopConfiguration(conf)).get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.avro.Schema;

import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.conf.HadoopParquetConfiguration;
import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.schema.ConversionPatterns;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
Expand Down Expand Up @@ -102,6 +104,10 @@ public AvroSchemaConverter() {
}

public AvroSchemaConverter(Configuration conf) {
this(new HadoopParquetConfiguration(conf));
}

public AvroSchemaConverter(ParquetConfiguration conf) {
this.assumeRepeatedIsListElement = conf.getBoolean(
ADD_LIST_ELEMENT_RECORDS, ADD_LIST_ELEMENT_RECORDS_DEFAULT);
this.writeOldListStructure = conf.getBoolean(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.conf.HadoopParquetConfiguration;
import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.util.ConfigurationUtil;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.GroupType;
Expand Down Expand Up @@ -129,6 +132,11 @@ public static void setSchema(Configuration configuration, Schema schema) {

@Override
public WriteContext init(Configuration configuration) {
return init(new HadoopParquetConfiguration(configuration));
}

@Override
public WriteContext init(ParquetConfiguration configuration) {
if (rootAvroSchema == null) {
this.rootAvroSchema = new Schema.Parser().parse(configuration.get(AVRO_SCHEMA));
this.rootSchema = new AvroSchemaConverter(configuration).convert(rootAvroSchema);
Expand Down Expand Up @@ -404,7 +412,7 @@ private Binary fromAvroString(Object value) {
return Binary.fromCharSequence(value.toString());
}

private static GenericData getDataModel(Configuration conf, Schema schema) {
private static GenericData getDataModel(ParquetConfiguration conf, Schema schema) {
amousavigourabi marked this conversation as resolved.
Show resolved Hide resolved
if (conf.get(AVRO_DATA_SUPPLIER) == null && schema != null) {
GenericData modelForSchema;
try {
Expand All @@ -423,7 +431,7 @@ private static GenericData getDataModel(Configuration conf, Schema schema) {

Class<? extends AvroDataSupplier> suppClass = conf.getClass(
AVRO_DATA_SUPPLIER, SpecificDataSupplier.class, AvroDataSupplier.class);
return ReflectionUtils.newInstance(suppClass, conf).get();
return ReflectionUtils.newInstance(suppClass, ConfigurationUtil.createHadoopConfiguration(conf)).get();
wgtmac marked this conversation as resolved.
Show resolved Hide resolved
}

private abstract class ListWriter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import org.apache.avro.util.Utf8;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.conf.HadoopParquetConfiguration;
import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.api.WriteSupport;
Expand All @@ -77,22 +79,31 @@ public class TestReadWrite {
@Parameterized.Parameters
public static Collection<Object[]> data() {
Object[][] data = new Object[][] {
{ false, false }, // use the new converters
{ true, false }, // use the old converters
{ false, true } }; // use a local disk location
{ false, false, false }, // use the new converters with hadoop config
{ true, false, false }, // use the old converters with hadoop config
{ false, true, false }, // use a local disk location with hadoop config
{ false, false, true }, // use the new converters with parquet config interface
{ true, false, true }, // use the old converters with parquet config interface
{ false, true, true } }; // use a local disk location with parquet config interface
return Arrays.asList(data);
}

private final boolean compat;
private final boolean local;
private final boolean confInterface;
private final Configuration testConf = new Configuration();
private final ParquetConfiguration parquetConf = new HadoopParquetConfiguration(true);

public TestReadWrite(boolean compat, boolean local) {
public TestReadWrite(boolean compat, boolean local, boolean confInterface) {
this.compat = compat;
this.local = local;
this.confInterface = confInterface;
this.testConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, compat);
testConf.setBoolean("parquet.avro.add-list-element-records", false);
testConf.setBoolean("parquet.avro.write-old-list-structure", false);
this.testConf.setBoolean("parquet.avro.add-list-element-records", false);
this.testConf.setBoolean("parquet.avro.write-old-list-structure", false);
this.parquetConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, compat);
this.parquetConf.setBoolean("parquet.avro.add-list-element-records", false);
this.parquetConf.setBoolean("parquet.avro.write-old-list-structure", false);
}

@Test
Expand Down Expand Up @@ -431,6 +442,11 @@ public void testAllUsingDefaultAvroSchema() throws Exception {

@Override
public WriteContext init(Configuration configuration) {
return init(new HadoopParquetConfiguration(configuration));
}

@Override
public WriteContext init(ParquetConfiguration configuration) {
return new WriteContext(MessageTypeParser.parseMessageType(TestAvroSchemaConverter.ALL_PARQUET_SCHEMA),
new HashMap<String, String>());
}
Expand Down Expand Up @@ -864,30 +880,44 @@ private File createTempFile() throws IOException {
}

private ParquetWriter<GenericRecord> writer(String file, Schema schema) throws IOException {
AvroParquetWriter.Builder<GenericRecord> writerBuilder;
if (local) {
return AvroParquetWriter
writerBuilder = AvroParquetWriter
.<GenericRecord>builder(new LocalOutputFile(Paths.get(file)))
.withSchema(schema)
.withConf(testConf)
.build();
.withSchema(schema);
} else {
return AvroParquetWriter
writerBuilder = AvroParquetWriter
.<GenericRecord>builder(new Path(file))
.withSchema(schema)
.withSchema(schema);
}
if (confInterface) {
return writerBuilder
.withConf(parquetConf)
.build();
} else {
return writerBuilder
.withConf(testConf)
.build();
}
}

private ParquetReader<GenericRecord> reader(String file) throws IOException {
AvroParquetReader.Builder<GenericRecord> readerBuilder;
if (local) {
return AvroParquetReader
readerBuilder = AvroParquetReader
.<GenericRecord>builder(new LocalInputFile(Paths.get(file)))
.withDataModel(GenericData.get())
.withConf(testConf)
.withDataModel(GenericData.get());
} else {
return new AvroParquetReader<>(testConf, new Path(file));
}
if (confInterface) {
return readerBuilder
.withConf(parquetConf)
.build();
} else {
return new AvroParquetReader(testConf, new Path(file));
return readerBuilder
.withConf(testConf)
.build();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.apache.avro.util.Utf8;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.conf.HadoopParquetConfiguration;
import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.io.api.Binary;
Expand Down Expand Up @@ -358,6 +360,11 @@ public void testAllUsingDefaultAvroSchema() throws Exception {

@Override
public WriteContext init(Configuration configuration) {
return init(new HadoopParquetConfiguration(configuration));
}

@Override
public WriteContext init(ParquetConfiguration configuration) {
return new WriteContext(MessageTypeParser.parseMessageType(TestAvroSchemaConverter.ALL_PARQUET_SCHEMA),
new HashMap<String, String>());
}
Expand Down
Loading