From c3c7ef00c2ee5f47f357ba18063c43ad231d16e1 Mon Sep 17 00:00:00 2001 From: "Uwe L. Korn" Date: Wed, 22 Jun 2016 00:36:57 -0700 Subject: [PATCH] PARQUET-636: Expose selection for different encodings Yet we still only support PLAIN encoding. Will implement the other encodings in separate PRs to not have huge changesets. Author: Uwe L. Korn Closes #122 from xhochy/parquet-636 and squashes the following commits: b98a575 [Uwe L. Korn] Lint fixes 37f1b7b [Uwe L. Korn] Add comment to describe the column default/specific vars 7ef8b12 [Uwe L. Korn] PARQUET-636: Expose selection for different encodings --- cpp/src/parquet/column/column-io-benchmark.cc | 2 +- cpp/src/parquet/column/column-writer-test.cc | 59 +++++++++++++++---- cpp/src/parquet/column/properties.h | 57 ++++++++++++++---- cpp/src/parquet/column/writer.cc | 33 +++++++---- cpp/src/parquet/column/writer.h | 6 +- cpp/src/parquet/file/writer-internal.cc | 19 +++--- 6 files changed, 126 insertions(+), 50 deletions(-) diff --git a/cpp/src/parquet/column/column-io-benchmark.cc b/cpp/src/parquet/column/column-io-benchmark.cc index 3bc258219a47f..10272b2b12fd0 100644 --- a/cpp/src/parquet/column/column-io-benchmark.cc +++ b/cpp/src/parquet/column/column-io-benchmark.cc @@ -35,7 +35,7 @@ std::unique_ptr BuildWriter(int64_t output_size, OutputStream* dst, std::unique_ptr pager( new SerializedPageWriter(dst, Compression::UNCOMPRESSED, metadata)); return std::unique_ptr( - new Int64Writer(schema, std::move(pager), output_size)); + new Int64Writer(schema, std::move(pager), output_size, Encoding::PLAIN)); } std::shared_ptr Int64Schema(Repetition::type repetition) { diff --git a/cpp/src/parquet/column/column-writer-test.cc b/cpp/src/parquet/column/column-writer-test.cc index 5d89c1ae68d52..f78ab5a2371b3 100644 --- a/cpp/src/parquet/column/column-writer-test.cc +++ b/cpp/src/parquet/column/column-writer-test.cc @@ -84,12 +84,12 @@ class TestPrimitiveWriter : public ::testing::Test { } std::unique_ptr> BuildWriter( - int64_t output_size = SMALL_SIZE) { + int64_t output_size = SMALL_SIZE, Encoding::type encoding = Encoding::PLAIN) { sink_.reset(new InMemoryOutputStream()); std::unique_ptr pager( new SerializedPageWriter(sink_.get(), Compression::UNCOMPRESSED, &metadata_)); - return std::unique_ptr>( - new TypedColumnWriter(schema_.get(), std::move(pager), output_size)); + return std::unique_ptr>(new TypedColumnWriter( + schema_.get(), std::move(pager), output_size, encoding)); } void SyncValuesOut(); @@ -100,6 +100,20 @@ class TestPrimitiveWriter : public ::testing::Test { SyncValuesOut(); } + void TestRequiredWithEncoding(Encoding::type encoding) { + this->GenerateData(SMALL_SIZE); + + // Test case 1: required and non-repeated, so no definition or repetition levels + std::unique_ptr> writer = + this->BuildWriter(SMALL_SIZE, encoding); + writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_); + writer->Close(); + + this->ReadColumn(); + ASSERT_EQ(SMALL_SIZE, this->values_read_); + ASSERT_EQ(this->values_, this->values_out_); + } + protected: int64_t values_read_; // Keep the reader alive as for ByteArray the lifetime of the ByteArray @@ -172,18 +186,39 @@ typedef ::testing::TypesGenerateData(SMALL_SIZE); +TYPED_TEST(TestPrimitiveWriter, RequiredPlain) { + this->TestRequiredWithEncoding(Encoding::PLAIN); +} - // Test case 1: required and non-repeated, so no definition or repetition levels - std::unique_ptr> writer = this->BuildWriter(); - writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_); - writer->Close(); +/* +TYPED_TEST(TestPrimitiveWriter, RequiredDictionary) { + this->TestRequiredWithEncoding(Encoding::PLAIN_DICTIONARY); +} - this->ReadColumn(); - ASSERT_EQ(SMALL_SIZE, this->values_read_); - ASSERT_EQ(this->values_, this->values_out_); +TYPED_TEST(TestPrimitiveWriter, RequiredRLE) { + this->TestRequiredWithEncoding(Encoding::RLE); +} + +TYPED_TEST(TestPrimitiveWriter, RequiredBitPacked) { + this->TestRequiredWithEncoding(Encoding::BIT_PACKED); +} + +TYPED_TEST(TestPrimitiveWriter, RequiredDeltaBinaryPacked) { + this->TestRequiredWithEncoding(Encoding::DELTA_BINARY_PACKED); +} + +TYPED_TEST(TestPrimitiveWriter, RequiredDeltaLengthByteArray) { + this->TestRequiredWithEncoding(Encoding::DELTA_LENGTH_BYTE_ARRAY); +} + +TYPED_TEST(TestPrimitiveWriter, RequiredDeltaByteArray) { + this->TestRequiredWithEncoding(Encoding::DELTA_BYTE_ARRAY); +} + +TYPED_TEST(TestPrimitiveWriter, RequiredRLEDictionary) { + this->TestRequiredWithEncoding(Encoding::RLE_DICTIONARY); } +*/ TYPED_TEST(TestPrimitiveWriter, Optional) { // Optional and non-repeated, with definition levels diff --git a/cpp/src/parquet/column/properties.h b/cpp/src/parquet/column/properties.h index ee74290943a29..42962296297ae 100644 --- a/cpp/src/parquet/column/properties.h +++ b/cpp/src/parquet/column/properties.h @@ -22,10 +22,10 @@ #include #include -#include "parquet/util/input.h" -#include "parquet/util/mem-allocator.h" #include "parquet/types.h" #include "parquet/schema/types.h" +#include "parquet/util/input.h" +#include "parquet/util/mem-allocator.h" namespace parquet { @@ -79,10 +79,10 @@ ReaderProperties default_reader_properties(); static int64_t DEFAULT_PAGE_SIZE = 1024 * 1024; static int64_t DEFAULT_DICTIONARY_PAGE_SIZE = DEFAULT_PAGE_SIZE; static bool DEFAULT_IS_DICTIONARY_ENABLED = true; +static Encoding::type DEFAULT_ENCODING = Encoding::PLAIN; static constexpr ParquetVersion::type DEFAULT_WRITER_VERSION = ParquetVersion::PARQUET_1_0; -static constexpr Compression::type DEFAULT_COMPRESSION_TYPE = - Compression::UNCOMPRESSED; +static constexpr Compression::type DEFAULT_COMPRESSION_TYPE = Compression::UNCOMPRESSED; using ColumnCodecs = std::unordered_map; @@ -94,6 +94,7 @@ class WriterProperties { : allocator_(default_allocator()), dictionary_enabled_(DEFAULT_IS_DICTIONARY_ENABLED), dictionary_pagesize_(DEFAULT_DICTIONARY_PAGE_SIZE), + default_encoding_(DEFAULT_ENCODING), pagesize_(DEFAULT_PAGE_SIZE), version_(DEFAULT_WRITER_VERSION), default_codec_(DEFAULT_COMPRESSION_TYPE) {} @@ -124,6 +125,21 @@ class WriterProperties { return this; } + Builder* encoding( + const std::shared_ptr& path, Encoding::type encoding_type) { + return encoding(path->ToDotString(), encoding_type); + } + + Builder* encoding(const std::string& column_path, Encoding::type encoding_type) { + encodings_[column_path] = encoding_type; + return this; + } + + Builder* encoding(Encoding::type encoding_type) { + default_encoding_ = encoding_type; + return this; + } + Builder* version(ParquetVersion::type version) { version_ = version; return this; @@ -139,14 +155,14 @@ class WriterProperties { return this; } - Builder* compression(const std::shared_ptr& path, - Compression::type codec) { + Builder* compression( + const std::shared_ptr& path, Compression::type codec) { return this->compression(path->ToDotString(), codec); } std::shared_ptr build() { - return std::shared_ptr(new WriterProperties( - allocator_, dictionary_enabled_, dictionary_pagesize_, + return std::shared_ptr(new WriterProperties(allocator_, + dictionary_enabled_, dictionary_pagesize_, default_encoding_, encodings_, pagesize_, version_, default_codec_, codecs_)); } @@ -154,8 +170,14 @@ class WriterProperties { MemoryAllocator* allocator_; bool dictionary_enabled_; int64_t dictionary_pagesize_; + // Encoding used for each column if not a specialized one is defined as + // part of encodings_ + Encoding::type default_encoding_; + std::unordered_map encodings_; int64_t pagesize_; ParquetVersion::type version_; + // Default compression codec. This will be used for all columns that do + // not have a specific codec set as part of codecs_ Compression::type default_codec_; ColumnCodecs codecs_; }; @@ -170,20 +192,29 @@ class WriterProperties { ParquetVersion::type version() const { return parquet_version_; } + Encoding::type encoding(const std::shared_ptr& path) const { + auto it = encodings_.find(path->ToDotString()); + if (it != encodings_.end()) { return it->second; } + return default_encoding_; + } + Compression::type compression(const std::shared_ptr& path) const { auto it = codecs_.find(path->ToDotString()); - if (it != codecs_.end()) - return it->second; + if (it != codecs_.end()) return it->second; return default_codec_; } private: explicit WriterProperties(MemoryAllocator* allocator, bool dictionary_enabled, - int64_t dictionary_pagesize, int64_t pagesize, ParquetVersion::type version, - Compression::type default_codec, const ColumnCodecs& codecs) + int64_t dictionary_pagesize, Encoding::type default_encoding, + const std::unordered_map& encodings, int64_t pagesize, + ParquetVersion::type version, Compression::type default_codec, + const ColumnCodecs& codecs) : allocator_(allocator), dictionary_enabled_(dictionary_enabled), dictionary_pagesize_(dictionary_pagesize), + default_encoding_(default_encoding), + encodings_(encodings), pagesize_(pagesize), parquet_version_(version), default_codec_(default_codec), @@ -195,6 +226,8 @@ class WriterProperties { MemoryAllocator* allocator_; bool dictionary_enabled_; int64_t dictionary_pagesize_; + Encoding::type default_encoding_; + std::unordered_map encodings_; int64_t pagesize_; ParquetVersion::type parquet_version_; Compression::type default_codec_; diff --git a/cpp/src/parquet/column/writer.cc b/cpp/src/parquet/column/writer.cc index 8856f51d992f3..482265e01d2e5 100644 --- a/cpp/src/parquet/column/writer.cc +++ b/cpp/src/parquet/column/writer.cc @@ -128,11 +128,17 @@ int64_t ColumnWriter::Close() { template TypedColumnWriter::TypedColumnWriter(const ColumnDescriptor* schema, - std::unique_ptr pager, int64_t expected_rows, MemoryAllocator* allocator) + std::unique_ptr pager, int64_t expected_rows, Encoding::type encoding, + MemoryAllocator* allocator) : ColumnWriter(schema, std::move(pager), expected_rows, allocator) { - // TODO(PARQUET-590) Get decoder type from WriterProperties - current_encoder_ = - std::unique_ptr(new PlainEncoder(schema, allocator)); + switch (encoding) { + case Encoding::PLAIN: + current_encoder_ = + std::unique_ptr(new PlainEncoder(schema, allocator)); + break; + default: + ParquetException::NYI("Selected encoding is not supported"); + } } // ---------------------------------------------------------------------- @@ -140,32 +146,33 @@ TypedColumnWriter::TypedColumnWriter(const ColumnDescriptor* schema, std::shared_ptr ColumnWriter::Make(const ColumnDescriptor* descr, std::unique_ptr pager, int64_t expected_rows, - MemoryAllocator* allocator) { + const WriterProperties* properties) { + Encoding::type encoding = properties->encoding(descr->path()); switch (descr->physical_type()) { case Type::BOOLEAN: return std::make_shared( - descr, std::move(pager), expected_rows, allocator); + descr, std::move(pager), expected_rows, encoding, properties->allocator()); case Type::INT32: return std::make_shared( - descr, std::move(pager), expected_rows, allocator); + descr, std::move(pager), expected_rows, encoding, properties->allocator()); case Type::INT64: return std::make_shared( - descr, std::move(pager), expected_rows, allocator); + descr, std::move(pager), expected_rows, encoding, properties->allocator()); case Type::INT96: return std::make_shared( - descr, std::move(pager), expected_rows, allocator); + descr, std::move(pager), expected_rows, encoding, properties->allocator()); case Type::FLOAT: return std::make_shared( - descr, std::move(pager), expected_rows, allocator); + descr, std::move(pager), expected_rows, encoding, properties->allocator()); case Type::DOUBLE: return std::make_shared( - descr, std::move(pager), expected_rows, allocator); + descr, std::move(pager), expected_rows, encoding, properties->allocator()); case Type::BYTE_ARRAY: return std::make_shared( - descr, std::move(pager), expected_rows, allocator); + descr, std::move(pager), expected_rows, encoding, properties->allocator()); case Type::FIXED_LEN_BYTE_ARRAY: return std::make_shared( - descr, std::move(pager), expected_rows, allocator); + descr, std::move(pager), expected_rows, encoding, properties->allocator()); default: ParquetException::NYI("type reader not implemented"); } diff --git a/cpp/src/parquet/column/writer.h b/cpp/src/parquet/column/writer.h index 24f647a48eb48..93c66a4fe091c 100644 --- a/cpp/src/parquet/column/writer.h +++ b/cpp/src/parquet/column/writer.h @@ -20,6 +20,7 @@ #include "parquet/column/levels.h" #include "parquet/column/page.h" +#include "parquet/column/properties.h" #include "parquet/encodings/encoder.h" #include "parquet/schema/descriptor.h" #include "parquet/types.h" @@ -35,7 +36,7 @@ class ColumnWriter { static std::shared_ptr Make(const ColumnDescriptor*, std::unique_ptr, int64_t expected_rows, - MemoryAllocator* allocator = default_allocator()); + const WriterProperties* properties); Type::type type() const { return descr_->physical_type(); } @@ -103,7 +104,8 @@ class TypedColumnWriter : public ColumnWriter { typedef typename DType::c_type T; TypedColumnWriter(const ColumnDescriptor* schema, std::unique_ptr pager, - int64_t expected_rows, MemoryAllocator* allocator = default_allocator()); + int64_t expected_rows, Encoding::type encoding, + MemoryAllocator* allocator = default_allocator()); // Write a batch of repetition levels, definition levels, and values to the // column. diff --git a/cpp/src/parquet/file/writer-internal.cc b/cpp/src/parquet/file/writer-internal.cc index a90f28ba60910..e0b1f66f6798f 100644 --- a/cpp/src/parquet/file/writer-internal.cc +++ b/cpp/src/parquet/file/writer-internal.cc @@ -78,10 +78,10 @@ int64_t SerializedPageWriter::WriteDataPage(int32_t num_rows, int32_t num_values std::shared_ptr compressed_data = uncompressed_data; if (compressor_) { const uint8_t* uncompressed_ptr = uncompressed_data->data(); - int64_t max_compressed_size = compressor_->MaxCompressedLen( - uncompressed_size, uncompressed_ptr); - compressed_data = std::make_shared(max_compressed_size, - allocator_); + int64_t max_compressed_size = + compressor_->MaxCompressedLen(uncompressed_size, uncompressed_ptr); + compressed_data = + std::make_shared(max_compressed_size, allocator_); compressed_size = compressor_->Compress(uncompressed_size, uncompressed_ptr, max_compressed_size, compressed_data->mutable_data()); } @@ -142,10 +142,10 @@ ColumnWriter* RowGroupSerializer::NextColumn() { col_meta->__isset.meta_data = true; col_meta->meta_data.__set_type(ToThrift(column_descr->physical_type())); col_meta->meta_data.__set_path_in_schema(column_descr->path()->ToDotVector()); - std::unique_ptr pager(new SerializedPageWriter(sink_, - properties_->compression(column_descr->path()), col_meta, allocator_)); + std::unique_ptr pager(new SerializedPageWriter( + sink_, properties_->compression(column_descr->path()), col_meta, allocator_)); current_column_writer_ = - ColumnWriter::Make(column_descr, std::move(pager), num_rows_, allocator_); + ColumnWriter::Make(column_descr, std::move(pager), num_rows_, properties_); return current_column_writer_.get(); } @@ -214,9 +214,8 @@ RowGroupWriter* FileSerializer::AppendRowGroup(int64_t num_rows) { auto rgm_size = row_group_metadata_.size(); row_group_metadata_.resize(rgm_size + 1); format::RowGroup* rg_metadata = &row_group_metadata_.data()[rgm_size]; - std::unique_ptr contents( - new RowGroupSerializer(num_rows, &schema_, sink_.get(), - rg_metadata, properties().get())); + std::unique_ptr contents(new RowGroupSerializer( + num_rows, &schema_, sink_.get(), rg_metadata, properties_.get())); row_group_writer_.reset(new RowGroupWriter(std::move(contents), allocator_)); return row_group_writer_.get(); }