Skip to content

Commit

Permalink
PARQUET-636: Expose selection for different encodings
Browse files Browse the repository at this point in the history
Yet we still only support PLAIN encoding. Will implement the other encodings in separate PRs to not have huge changesets.

Author: Uwe L. Korn <[email protected]>

Closes apache#122 from xhochy/parquet-636 and squashes the following commits:

b98a575 [Uwe L. Korn] Lint fixes
37f1b7b [Uwe L. Korn] Add comment to describe the column default/specific vars
7ef8b12 [Uwe L. Korn] PARQUET-636: Expose selection for different encodings
  • Loading branch information
xhochy authored and wesm committed Sep 2, 2018
1 parent 24a5191 commit c3c7ef0
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 50 deletions.
2 changes: 1 addition & 1 deletion cpp/src/parquet/column/column-io-benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ std::unique_ptr<Int64Writer> BuildWriter(int64_t output_size, OutputStream* dst,
std::unique_ptr<SerializedPageWriter> pager(
new SerializedPageWriter(dst, Compression::UNCOMPRESSED, metadata));
return std::unique_ptr<Int64Writer>(
new Int64Writer(schema, std::move(pager), output_size));
new Int64Writer(schema, std::move(pager), output_size, Encoding::PLAIN));
}

std::shared_ptr<ColumnDescriptor> Int64Schema(Repetition::type repetition) {
Expand Down
59 changes: 47 additions & 12 deletions cpp/src/parquet/column/column-writer-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,12 @@ class TestPrimitiveWriter : public ::testing::Test {
}

std::unique_ptr<TypedColumnWriter<TestType>> BuildWriter(
int64_t output_size = SMALL_SIZE) {
int64_t output_size = SMALL_SIZE, Encoding::type encoding = Encoding::PLAIN) {
sink_.reset(new InMemoryOutputStream());
std::unique_ptr<SerializedPageWriter> pager(
new SerializedPageWriter(sink_.get(), Compression::UNCOMPRESSED, &metadata_));
return std::unique_ptr<TypedColumnWriter<TestType>>(
new TypedColumnWriter<TestType>(schema_.get(), std::move(pager), output_size));
return std::unique_ptr<TypedColumnWriter<TestType>>(new TypedColumnWriter<TestType>(
schema_.get(), std::move(pager), output_size, encoding));
}

void SyncValuesOut();
Expand All @@ -100,6 +100,20 @@ class TestPrimitiveWriter : public ::testing::Test {
SyncValuesOut();
}

void TestRequiredWithEncoding(Encoding::type encoding) {
this->GenerateData(SMALL_SIZE);

// Test case 1: required and non-repeated, so no definition or repetition levels
std::unique_ptr<TypedColumnWriter<TestType>> writer =
this->BuildWriter(SMALL_SIZE, encoding);
writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_);
writer->Close();

this->ReadColumn();
ASSERT_EQ(SMALL_SIZE, this->values_read_);
ASSERT_EQ(this->values_, this->values_out_);
}

protected:
int64_t values_read_;
// Keep the reader alive as for ByteArray the lifetime of the ByteArray
Expand Down Expand Up @@ -172,18 +186,39 @@ typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,

TYPED_TEST_CASE(TestPrimitiveWriter, TestTypes);

TYPED_TEST(TestPrimitiveWriter, Required) {
this->GenerateData(SMALL_SIZE);
TYPED_TEST(TestPrimitiveWriter, RequiredPlain) {
this->TestRequiredWithEncoding(Encoding::PLAIN);
}

// Test case 1: required and non-repeated, so no definition or repetition levels
std::unique_ptr<TypedColumnWriter<TypeParam>> writer = this->BuildWriter();
writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_);
writer->Close();
/*
TYPED_TEST(TestPrimitiveWriter, RequiredDictionary) {
this->TestRequiredWithEncoding(Encoding::PLAIN_DICTIONARY);
}
this->ReadColumn();
ASSERT_EQ(SMALL_SIZE, this->values_read_);
ASSERT_EQ(this->values_, this->values_out_);
TYPED_TEST(TestPrimitiveWriter, RequiredRLE) {
this->TestRequiredWithEncoding(Encoding::RLE);
}
TYPED_TEST(TestPrimitiveWriter, RequiredBitPacked) {
this->TestRequiredWithEncoding(Encoding::BIT_PACKED);
}
TYPED_TEST(TestPrimitiveWriter, RequiredDeltaBinaryPacked) {
this->TestRequiredWithEncoding(Encoding::DELTA_BINARY_PACKED);
}
TYPED_TEST(TestPrimitiveWriter, RequiredDeltaLengthByteArray) {
this->TestRequiredWithEncoding(Encoding::DELTA_LENGTH_BYTE_ARRAY);
}
TYPED_TEST(TestPrimitiveWriter, RequiredDeltaByteArray) {
this->TestRequiredWithEncoding(Encoding::DELTA_BYTE_ARRAY);
}
TYPED_TEST(TestPrimitiveWriter, RequiredRLEDictionary) {
this->TestRequiredWithEncoding(Encoding::RLE_DICTIONARY);
}
*/

TYPED_TEST(TestPrimitiveWriter, Optional) {
// Optional and non-repeated, with definition levels
Expand Down
57 changes: 45 additions & 12 deletions cpp/src/parquet/column/properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
#include <string>
#include <unordered_map>

#include "parquet/util/input.h"
#include "parquet/util/mem-allocator.h"
#include "parquet/types.h"
#include "parquet/schema/types.h"
#include "parquet/util/input.h"
#include "parquet/util/mem-allocator.h"

namespace parquet {

Expand Down Expand Up @@ -79,10 +79,10 @@ ReaderProperties default_reader_properties();
static int64_t DEFAULT_PAGE_SIZE = 1024 * 1024;
static int64_t DEFAULT_DICTIONARY_PAGE_SIZE = DEFAULT_PAGE_SIZE;
static bool DEFAULT_IS_DICTIONARY_ENABLED = true;
static Encoding::type DEFAULT_ENCODING = Encoding::PLAIN;
static constexpr ParquetVersion::type DEFAULT_WRITER_VERSION =
ParquetVersion::PARQUET_1_0;
static constexpr Compression::type DEFAULT_COMPRESSION_TYPE =
Compression::UNCOMPRESSED;
static constexpr Compression::type DEFAULT_COMPRESSION_TYPE = Compression::UNCOMPRESSED;

using ColumnCodecs = std::unordered_map<std::string, Compression::type>;

Expand All @@ -94,6 +94,7 @@ class WriterProperties {
: allocator_(default_allocator()),
dictionary_enabled_(DEFAULT_IS_DICTIONARY_ENABLED),
dictionary_pagesize_(DEFAULT_DICTIONARY_PAGE_SIZE),
default_encoding_(DEFAULT_ENCODING),
pagesize_(DEFAULT_PAGE_SIZE),
version_(DEFAULT_WRITER_VERSION),
default_codec_(DEFAULT_COMPRESSION_TYPE) {}
Expand Down Expand Up @@ -124,6 +125,21 @@ class WriterProperties {
return this;
}

Builder* encoding(
const std::shared_ptr<schema::ColumnPath>& path, Encoding::type encoding_type) {
return encoding(path->ToDotString(), encoding_type);
}

Builder* encoding(const std::string& column_path, Encoding::type encoding_type) {
encodings_[column_path] = encoding_type;
return this;
}

Builder* encoding(Encoding::type encoding_type) {
default_encoding_ = encoding_type;
return this;
}

Builder* version(ParquetVersion::type version) {
version_ = version;
return this;
Expand All @@ -139,23 +155,29 @@ class WriterProperties {
return this;
}

Builder* compression(const std::shared_ptr<schema::ColumnPath>& path,
Compression::type codec) {
Builder* compression(
const std::shared_ptr<schema::ColumnPath>& path, Compression::type codec) {
return this->compression(path->ToDotString(), codec);
}

std::shared_ptr<WriterProperties> build() {
return std::shared_ptr<WriterProperties>(new WriterProperties(
allocator_, dictionary_enabled_, dictionary_pagesize_,
return std::shared_ptr<WriterProperties>(new WriterProperties(allocator_,
dictionary_enabled_, dictionary_pagesize_, default_encoding_, encodings_,
pagesize_, version_, default_codec_, codecs_));
}

private:
MemoryAllocator* allocator_;
bool dictionary_enabled_;
int64_t dictionary_pagesize_;
// Encoding used for each column if not a specialized one is defined as
// part of encodings_
Encoding::type default_encoding_;
std::unordered_map<std::string, Encoding::type> encodings_;
int64_t pagesize_;
ParquetVersion::type version_;
// Default compression codec. This will be used for all columns that do
// not have a specific codec set as part of codecs_
Compression::type default_codec_;
ColumnCodecs codecs_;
};
Expand All @@ -170,20 +192,29 @@ class WriterProperties {

ParquetVersion::type version() const { return parquet_version_; }

Encoding::type encoding(const std::shared_ptr<schema::ColumnPath>& path) const {
auto it = encodings_.find(path->ToDotString());
if (it != encodings_.end()) { return it->second; }
return default_encoding_;
}

Compression::type compression(const std::shared_ptr<schema::ColumnPath>& path) const {
auto it = codecs_.find(path->ToDotString());
if (it != codecs_.end())
return it->second;
if (it != codecs_.end()) return it->second;
return default_codec_;
}

private:
explicit WriterProperties(MemoryAllocator* allocator, bool dictionary_enabled,
int64_t dictionary_pagesize, int64_t pagesize, ParquetVersion::type version,
Compression::type default_codec, const ColumnCodecs& codecs)
int64_t dictionary_pagesize, Encoding::type default_encoding,
const std::unordered_map<std::string, Encoding::type>& encodings, int64_t pagesize,
ParquetVersion::type version, Compression::type default_codec,
const ColumnCodecs& codecs)
: allocator_(allocator),
dictionary_enabled_(dictionary_enabled),
dictionary_pagesize_(dictionary_pagesize),
default_encoding_(default_encoding),
encodings_(encodings),
pagesize_(pagesize),
parquet_version_(version),
default_codec_(default_codec),
Expand All @@ -195,6 +226,8 @@ class WriterProperties {
MemoryAllocator* allocator_;
bool dictionary_enabled_;
int64_t dictionary_pagesize_;
Encoding::type default_encoding_;
std::unordered_map<std::string, Encoding::type> encodings_;
int64_t pagesize_;
ParquetVersion::type parquet_version_;
Compression::type default_codec_;
Expand Down
33 changes: 20 additions & 13 deletions cpp/src/parquet/column/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,44 +128,51 @@ int64_t ColumnWriter::Close() {

template <typename Type>
TypedColumnWriter<Type>::TypedColumnWriter(const ColumnDescriptor* schema,
std::unique_ptr<PageWriter> pager, int64_t expected_rows, MemoryAllocator* allocator)
std::unique_ptr<PageWriter> pager, int64_t expected_rows, Encoding::type encoding,
MemoryAllocator* allocator)
: ColumnWriter(schema, std::move(pager), expected_rows, allocator) {
// TODO(PARQUET-590) Get decoder type from WriterProperties
current_encoder_ =
std::unique_ptr<EncoderType>(new PlainEncoder<Type>(schema, allocator));
switch (encoding) {
case Encoding::PLAIN:
current_encoder_ =
std::unique_ptr<EncoderType>(new PlainEncoder<Type>(schema, allocator));
break;
default:
ParquetException::NYI("Selected encoding is not supported");
}
}

// ----------------------------------------------------------------------
// Dynamic column writer constructor

std::shared_ptr<ColumnWriter> ColumnWriter::Make(const ColumnDescriptor* descr,
std::unique_ptr<PageWriter> pager, int64_t expected_rows,
MemoryAllocator* allocator) {
const WriterProperties* properties) {
Encoding::type encoding = properties->encoding(descr->path());
switch (descr->physical_type()) {
case Type::BOOLEAN:
return std::make_shared<BoolWriter>(
descr, std::move(pager), expected_rows, allocator);
descr, std::move(pager), expected_rows, encoding, properties->allocator());
case Type::INT32:
return std::make_shared<Int32Writer>(
descr, std::move(pager), expected_rows, allocator);
descr, std::move(pager), expected_rows, encoding, properties->allocator());
case Type::INT64:
return std::make_shared<Int64Writer>(
descr, std::move(pager), expected_rows, allocator);
descr, std::move(pager), expected_rows, encoding, properties->allocator());
case Type::INT96:
return std::make_shared<Int96Writer>(
descr, std::move(pager), expected_rows, allocator);
descr, std::move(pager), expected_rows, encoding, properties->allocator());
case Type::FLOAT:
return std::make_shared<FloatWriter>(
descr, std::move(pager), expected_rows, allocator);
descr, std::move(pager), expected_rows, encoding, properties->allocator());
case Type::DOUBLE:
return std::make_shared<DoubleWriter>(
descr, std::move(pager), expected_rows, allocator);
descr, std::move(pager), expected_rows, encoding, properties->allocator());
case Type::BYTE_ARRAY:
return std::make_shared<ByteArrayWriter>(
descr, std::move(pager), expected_rows, allocator);
descr, std::move(pager), expected_rows, encoding, properties->allocator());
case Type::FIXED_LEN_BYTE_ARRAY:
return std::make_shared<FixedLenByteArrayWriter>(
descr, std::move(pager), expected_rows, allocator);
descr, std::move(pager), expected_rows, encoding, properties->allocator());
default:
ParquetException::NYI("type reader not implemented");
}
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/parquet/column/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include "parquet/column/levels.h"
#include "parquet/column/page.h"
#include "parquet/column/properties.h"
#include "parquet/encodings/encoder.h"
#include "parquet/schema/descriptor.h"
#include "parquet/types.h"
Expand All @@ -35,7 +36,7 @@ class ColumnWriter {

static std::shared_ptr<ColumnWriter> Make(const ColumnDescriptor*,
std::unique_ptr<PageWriter>, int64_t expected_rows,
MemoryAllocator* allocator = default_allocator());
const WriterProperties* properties);

Type::type type() const { return descr_->physical_type(); }

Expand Down Expand Up @@ -103,7 +104,8 @@ class TypedColumnWriter : public ColumnWriter {
typedef typename DType::c_type T;

TypedColumnWriter(const ColumnDescriptor* schema, std::unique_ptr<PageWriter> pager,
int64_t expected_rows, MemoryAllocator* allocator = default_allocator());
int64_t expected_rows, Encoding::type encoding,
MemoryAllocator* allocator = default_allocator());

// Write a batch of repetition levels, definition levels, and values to the
// column.
Expand Down
19 changes: 9 additions & 10 deletions cpp/src/parquet/file/writer-internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,10 @@ int64_t SerializedPageWriter::WriteDataPage(int32_t num_rows, int32_t num_values
std::shared_ptr<OwnedMutableBuffer> compressed_data = uncompressed_data;
if (compressor_) {
const uint8_t* uncompressed_ptr = uncompressed_data->data();
int64_t max_compressed_size = compressor_->MaxCompressedLen(
uncompressed_size, uncompressed_ptr);
compressed_data = std::make_shared<OwnedMutableBuffer>(max_compressed_size,
allocator_);
int64_t max_compressed_size =
compressor_->MaxCompressedLen(uncompressed_size, uncompressed_ptr);
compressed_data =
std::make_shared<OwnedMutableBuffer>(max_compressed_size, allocator_);
compressed_size = compressor_->Compress(uncompressed_size, uncompressed_ptr,
max_compressed_size, compressed_data->mutable_data());
}
Expand Down Expand Up @@ -142,10 +142,10 @@ ColumnWriter* RowGroupSerializer::NextColumn() {
col_meta->__isset.meta_data = true;
col_meta->meta_data.__set_type(ToThrift(column_descr->physical_type()));
col_meta->meta_data.__set_path_in_schema(column_descr->path()->ToDotVector());
std::unique_ptr<PageWriter> pager(new SerializedPageWriter(sink_,
properties_->compression(column_descr->path()), col_meta, allocator_));
std::unique_ptr<PageWriter> pager(new SerializedPageWriter(
sink_, properties_->compression(column_descr->path()), col_meta, allocator_));
current_column_writer_ =
ColumnWriter::Make(column_descr, std::move(pager), num_rows_, allocator_);
ColumnWriter::Make(column_descr, std::move(pager), num_rows_, properties_);
return current_column_writer_.get();
}

Expand Down Expand Up @@ -214,9 +214,8 @@ RowGroupWriter* FileSerializer::AppendRowGroup(int64_t num_rows) {
auto rgm_size = row_group_metadata_.size();
row_group_metadata_.resize(rgm_size + 1);
format::RowGroup* rg_metadata = &row_group_metadata_.data()[rgm_size];
std::unique_ptr<RowGroupWriter::Contents> contents(
new RowGroupSerializer(num_rows, &schema_, sink_.get(),
rg_metadata, properties().get()));
std::unique_ptr<RowGroupWriter::Contents> contents(new RowGroupSerializer(
num_rows, &schema_, sink_.get(), rg_metadata, properties_.get()));
row_group_writer_.reset(new RowGroupWriter(std::move(contents), allocator_));
return row_group_writer_.get();
}
Expand Down

0 comments on commit c3c7ef0

Please sign in to comment.