Skip to content

Commit

Permalink
Removed superfluous overrides of the HoodieBaseParquetConfig;
Browse files Browse the repository at this point in the history
`HoodieBaseParquetConfig` > `HoodieParquetConfig`
  • Loading branch information
Alexey Kudinkin committed Jun 24, 2022
1 parent a00533a commit b086b2a
Show file tree
Hide file tree
Showing 15 changed files with 43 additions and 154 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.avro.HoodieAvroWriteSupport
import org.apache.hudi.client.SparkTaskContextSupplier
import org.apache.hudi.common.HoodieJsonPayload
import org.apache.hudi.common.bloom.{BloomFilter, BloomFilterFactory}
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
import org.apache.hudi.common.util.BaseFileUtils
import org.apache.hudi.config.{HoodieIndexConfig, HoodieStorageConfig}
import org.apache.hudi.io.storage.{HoodieAvroParquetConfig, HoodieAvroParquetWriter}
import org.apache.hudi.io.storage.{HoodieAvroParquetWriter, HoodieParquetConfig}
import org.apache.parquet.avro.AvroSchemaConverter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.spark.sql.{DataFrame, SQLContext}
Expand All @@ -45,7 +44,7 @@ object SparkHelpers {
val filter: BloomFilter = BloomFilterFactory.createBloomFilter(HoodieIndexConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE.defaultValue.toInt, HoodieIndexConfig.BLOOM_FILTER_FPP_VALUE.defaultValue.toDouble,
HoodieIndexConfig.BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES.defaultValue.toInt, HoodieIndexConfig.BLOOM_FILTER_TYPE.defaultValue);
val writeSupport: HoodieAvroWriteSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter(fs.getConf).convert(schema), schema, org.apache.hudi.common.util.Option.of(filter))
val parquetConfig: HoodieAvroParquetConfig = new HoodieAvroParquetConfig(writeSupport, CompressionCodecName.GZIP, HoodieStorageConfig.PARQUET_BLOCK_SIZE.defaultValue.toInt, HoodieStorageConfig.PARQUET_PAGE_SIZE.defaultValue.toInt, HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.defaultValue.toInt, fs.getConf, HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue.toDouble)
val parquetConfig: HoodieParquetConfig[HoodieAvroWriteSupport] = new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, HoodieStorageConfig.PARQUET_BLOCK_SIZE.defaultValue.toInt, HoodieStorageConfig.PARQUET_PAGE_SIZE.defaultValue.toInt, HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.defaultValue.toInt, fs.getConf, HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue.toDouble)

// Add current classLoad for config, if not will throw classNotFound of 'HoodieWrapperFileSystem'.
parquetConfig.getHadoopConf().setClassLoader(Thread.currentThread.getContextClassLoader)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ public class HoodieAvroParquetWriter<R extends IndexedRecord>

@SuppressWarnings({"unchecked", "rawtypes"})
public HoodieAvroParquetWriter(Path file,
HoodieAvroParquetConfig parquetConfig,
HoodieParquetConfig<HoodieAvroWriteSupport> parquetConfig,
String instantTime,
TaskContextSupplier taskContextSupplier,
boolean populateMetaFields) throws IOException {
super(file, (HoodieBaseParquetConfig) parquetConfig);
super(file, (HoodieParquetConfig) parquetConfig);
this.fileName = file.getName();
this.writeSupport = parquetConfig.getWriteSupport();
this.instantTime = instantTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public abstract class HoodieBaseParquetWriter<R> extends ParquetWriter<R> {
private long lastCachedDataSize = -1;

public HoodieBaseParquetWriter(Path file,
HoodieBaseParquetConfig<? extends WriteSupport<R>> parquetConfig) throws IOException {
HoodieParquetConfig<? extends WriteSupport<R>> parquetConfig) throws IOException {
super(HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf()),
ParquetFileWriter.Mode.CREATE,
parquetConfig.getWriteSupport(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFi
Option<BloomFilter> filter = enableBloomFilter ? Option.of(createBloomFilter(config)) : Option.empty();
HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter(conf).convert(schema), schema, filter);

HoodieAvroParquetConfig parquetConfig = new HoodieAvroParquetConfig(writeSupport, config.getParquetCompressionCodec(),
HoodieParquetConfig<HoodieAvroWriteSupport> parquetConfig = new HoodieParquetConfig<>(writeSupport, config.getParquetCompressionCodec(),
config.getParquetBlockSize(), config.getParquetPageSize(), config.getParquetMaxFileSize(),
conf, config.getParquetCompressionRatio(), config.parquetDictionaryEnabled());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@

package org.apache.hudi.testutils;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.common.bloom.BloomFilter;
Expand All @@ -38,18 +44,11 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.io.storage.HoodieAvroParquetConfig;
import org.apache.hudi.io.storage.HoodieAvroParquetWriter;
import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.io.storage.HoodieOrcConfig;
import org.apache.hudi.io.storage.HoodieOrcWriter;
import org.apache.hudi.io.storage.HoodieAvroParquetWriter;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.orc.CompressionKind;
Expand Down Expand Up @@ -110,7 +109,7 @@ public Path withInserts(String partition, String fileId, List<HoodieRecord> reco
if (HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().equals(HoodieFileFormat.PARQUET)) {
HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(
new AvroSchemaConverter().convert(schema), schema, Option.of(filter));
HoodieAvroParquetConfig config = new HoodieAvroParquetConfig(writeSupport, CompressionCodecName.GZIP,
HoodieParquetConfig<HoodieAvroWriteSupport> config = new HoodieParquetConfig<>(writeSupport, CompressionCodecName.GZIP,
ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 120 * 1024 * 1024,
new Configuration(), Double.parseDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue()));
try (HoodieAvroParquetWriter writer = new HoodieAvroParquetWriter<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.table.HoodieTable;

import org.apache.flink.table.types.logical.RowType;
Expand Down Expand Up @@ -67,7 +68,7 @@ private static HoodieRowDataFileWriter newParquetInternalRowFileWriter(
HoodieRowDataParquetWriteSupport writeSupport =
new HoodieRowDataParquetWriteSupport(table.getHadoopConf(), rowType, filter);
return new HoodieRowDataParquetWriter(
path, new HoodieRowDataParquetConfig(
path, new HoodieParquetConfig<>(
writeSupport,
writeConfig.getParquetCompressionCodec(),
writeConfig.getParquetBlockSize(),
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.apache.flink.table.data.RowData;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;

Expand All @@ -39,7 +40,7 @@ public class HoodieRowDataParquetWriter extends ParquetWriter<RowData>
private final long maxFileSize;
private final HoodieRowDataParquetWriteSupport writeSupport;

public HoodieRowDataParquetWriter(Path file, HoodieRowDataParquetConfig parquetConfig)
public HoodieRowDataParquetWriter(Path file, HoodieParquetConfig<HoodieRowDataParquetWriteSupport> parquetConfig)
throws IOException {
super(HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf()),
ParquetFileWriter.Mode.CREATE, parquetConfig.getWriteSupport(), parquetConfig.getCompressionCodecName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.table.HoodieTable;

import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -68,7 +69,8 @@ private static HoodieInternalRowFileWriter newParquetInternalRowFileWriter(
HoodieRowParquetWriteSupport writeSupport =
new HoodieRowParquetWriteSupport(table.getHadoopConf(), structType, filter, writeConfig);
return new HoodieInternalRowParquetWriter(
path, new HoodieRowParquetConfig(
path,
new HoodieParquetConfig<>(
writeSupport,
writeConfig.getParquetCompressionCodec(),
writeConfig.getParquetBlockSize(),
Expand All @@ -95,13 +97,15 @@ private static HoodieInternalRowFileWriter newParquetInternalRowFileWriterWithou
HoodieRowParquetWriteSupport writeSupport =
new HoodieRowParquetWriteSupport(table.getHadoopConf(), structType, null, writeConfig);
return new HoodieInternalRowParquetWriter(
path, new HoodieRowParquetConfig(
path, new HoodieParquetConfig<>(
writeSupport,
writeConfig.getParquetCompressionCodec(),
writeConfig.getParquetBlockSize(),
writeConfig.getParquetPageSize(),
writeConfig.getParquetMaxFileSize(),
writeSupport.getHadoopConf(),
writeConfig.getParquetCompressionRatio()));
writeConfig.getParquetCompressionRatio(),
writeConfig.parquetDictionaryEnabled())
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.io.storage.row;

import org.apache.hadoop.fs.Path;
import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.io.storage.HoodieBaseParquetWriter;
import org.apache.spark.sql.catalyst.InternalRow;

Expand All @@ -32,7 +33,7 @@ public class HoodieInternalRowParquetWriter extends HoodieBaseParquetWriter<Inte

private final HoodieRowParquetWriteSupport writeSupport;

public HoodieInternalRowParquetWriter(Path file, HoodieRowParquetConfig parquetConfig)
public HoodieInternalRowParquetWriter(Path file, HoodieParquetConfig<HoodieRowParquetWriteSupport> parquetConfig)
throws IOException {
super(file, parquetConfig);

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,20 @@

package org.apache.hudi.common.table.log.block;

import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.common.fs.inline.InLineFSUtils;
import org.apache.hudi.common.fs.inline.InLineFileSystem;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetReaderIterator;
import org.apache.hudi.io.storage.HoodieAvroParquetConfig;
import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.io.storage.HoodieParquetStreamWriter;

import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.avro.AvroSchemaConverter;
Expand All @@ -43,7 +42,6 @@
import org.apache.parquet.io.InputFile;

import javax.annotation.Nonnull;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.HashMap;
Expand Down Expand Up @@ -97,8 +95,8 @@ protected byte[] serializeRecords(List<IndexedRecord> records) throws IOExceptio
HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(
new AvroSchemaConverter().convert(writerSchema), writerSchema, Option.empty());

HoodieAvroParquetConfig avroParquetConfig =
new HoodieAvroParquetConfig(
HoodieParquetConfig<HoodieAvroWriteSupport> avroParquetConfig =
new HoodieParquetConfig<>(
writeSupport,
compressionCodecName.get(),
ParquetWriter.DEFAULT_BLOCK_SIZE,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
* Base ParquetConfig to hold config params for writing to Parquet.
* @param <T>
*/
public class HoodieBaseParquetConfig<T> {
public class HoodieParquetConfig<T> {
private final T writeSupport;
private final CompressionCodecName compressionCodecName;
private final int blockSize;
Expand All @@ -35,13 +35,13 @@ public class HoodieBaseParquetConfig<T> {
private final double compressionRatio;
private final boolean dictionaryEnabled;

public HoodieBaseParquetConfig(T writeSupport, CompressionCodecName compressionCodecName, int blockSize,
int pageSize, long maxFileSize, Configuration hadoopConf, double compressionRatio) {
public HoodieParquetConfig(T writeSupport, CompressionCodecName compressionCodecName, int blockSize,
int pageSize, long maxFileSize, Configuration hadoopConf, double compressionRatio) {
this(writeSupport, compressionCodecName, blockSize, pageSize, maxFileSize, hadoopConf, compressionRatio, false);
}

public HoodieBaseParquetConfig(T writeSupport, CompressionCodecName compressionCodecName, int blockSize,
int pageSize, long maxFileSize, Configuration hadoopConf, double compressionRatio, boolean dictionaryEnabled) {
public HoodieParquetConfig(T writeSupport, CompressionCodecName compressionCodecName, int blockSize,
int pageSize, long maxFileSize, Configuration hadoopConf, double compressionRatio, boolean dictionaryEnabled) {
this.writeSupport = writeSupport;
this.compressionCodecName = compressionCodecName;
this.blockSize = blockSize;
Expand Down
Loading

0 comments on commit b086b2a

Please sign in to comment.