diff --git a/src/parquet/column/CMakeLists.txt b/src/parquet/column/CMakeLists.txt index 99b4ed210..fa1ce7afc 100644 --- a/src/parquet/column/CMakeLists.txt +++ b/src/parquet/column/CMakeLists.txt @@ -25,3 +25,4 @@ install(FILES ADD_PARQUET_TEST(column-reader-test) ADD_PARQUET_TEST(levels-test) +ADD_PARQUET_TEST(scanner-test) diff --git a/src/parquet/column/column-reader-test.cc b/src/parquet/column/column-reader-test.cc index 3a2bbd8df..079201a9f 100644 --- a/src/parquet/column/column-reader-test.cc +++ b/src/parquet/column/column-reader-test.cc @@ -44,195 +44,132 @@ namespace test { class TestPrimitiveReader : public ::testing::Test { public: - void SetUp() {} - - void TearDown() {} + void MakePages(const ColumnDescriptor *d, int num_pages, int levels_per_page) { + num_levels_ = levels_per_page * num_pages; + num_values_ = 0; + uint32_t seed = 0; + int16_t zero = 0; + vector values_per_page(num_pages, levels_per_page); + // Create definition levels + if (max_def_level_ > 0) { + def_levels_.resize(num_levels_); + random_numbers(num_levels_, seed, zero, max_def_level_, def_levels_.data()); + for (int p = 0; p < num_pages; p++) { + int num_values_per_page = 0; + for (int i = 0; i < levels_per_page; i++) { + if (def_levels_[i + p * levels_per_page] == max_def_level_) { + num_values_per_page++; + num_values_++; + } + } + values_per_page[p] = num_values_per_page; + } + } else { + num_values_ = num_levels_; + } + // Create repitition levels + if (max_rep_level_ > 0) { + rep_levels_.resize(num_levels_); + random_numbers(num_levels_, seed, zero, max_rep_level_, rep_levels_.data()); + } + // Create values + values_.resize(num_values_); + random_numbers(num_values_, seed, std::numeric_limits::min(), + std::numeric_limits::max(), values_.data()); + Paginate(d, values_, def_levels_, max_def_level_, + rep_levels_, max_rep_level_, levels_per_page, values_per_page, pages_); + } - void InitReader(const ColumnDescriptor* descr) { + void InitReader(const ColumnDescriptor* d) { + std::unique_ptr pager_; pager_.reset(new test::MockPageReader(pages_)); - reader_ = ColumnReader::Make(descr, std::move(pager_)); + reader_ = ColumnReader::Make(d, std::move(pager_)); + } + + void CheckResults() { + vector vresult(num_values_, -1); + vector dresult(num_levels_, -1); + vector rresult(num_levels_, -1); + size_t values_read = 0; + size_t total_values_read = 0; + size_t batch_actual = 0; + + Int32Reader* reader = static_cast(reader_.get()); + int32_t batch_size = 8; + size_t batch = 0; + // This will cover both the cases + // 1) batch_size < page_size (multiple ReadBatch from a single page) + // 2) batch_size > page_size (BatchRead limits to a single page) + do { + batch = reader->ReadBatch(batch_size, &dresult[0] + batch_actual, + &rresult[0] + batch_actual, &vresult[0] + total_values_read, &values_read); + total_values_read += values_read; + batch_actual += batch; + batch_size = std::max(batch_size * 2, 4096); + } while (batch > 0); + + ASSERT_EQ(num_levels_, batch_actual); + ASSERT_EQ(num_values_, total_values_read); + ASSERT_TRUE(vector_equal(values_, vresult)); + if (max_def_level_ > 0) { + ASSERT_TRUE(vector_equal(def_levels_, dresult)); + } + if (max_rep_level_ > 0) { + ASSERT_TRUE(vector_equal(rep_levels_, rresult)); + } + // catch improper writes at EOS + batch_actual = reader->ReadBatch(5, nullptr, nullptr, nullptr, &values_read); + ASSERT_EQ(0, batch_actual); + ASSERT_EQ(0, values_read); + } + + void execute(int num_pages, int levels_page, const ColumnDescriptor *d) { + MakePages(d, num_pages, levels_page); + InitReader(d); + CheckResults(); } protected: - std::shared_ptr reader_; - std::unique_ptr pager_; + int num_levels_; + int num_values_; + int16_t max_def_level_; + int16_t max_rep_level_; vector > pages_; + std::shared_ptr reader_; + vector values_; + vector def_levels_; + vector rep_levels_; }; - TEST_F(TestPrimitiveReader, TestInt32FlatRequired) { - vector values = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; - - std::shared_ptr page = MakeDataPage(values, {}, 0, - {}, 0); - pages_.push_back(page); - + int levels_per_page = 100; + int num_pages = 50; + max_def_level_ = 0; + max_rep_level_ = 0; NodePtr type = schema::Int32("a", Repetition::REQUIRED); - ColumnDescriptor descr(type, 0, 0); - InitReader(&descr); - - Int32Reader* reader = static_cast(reader_.get()); - - vector result(10, -1); - - size_t values_read = 0; - size_t batch_actual = reader->ReadBatch(10, nullptr, nullptr, - &result[0], &values_read); - ASSERT_EQ(10, batch_actual); - ASSERT_EQ(10, values_read); - - ASSERT_TRUE(vector_equal(result, values)); + const ColumnDescriptor descr(type, max_def_level_, max_rep_level_); + execute(num_pages, levels_per_page, &descr); } - TEST_F(TestPrimitiveReader, TestInt32FlatOptional) { - vector values = {1, 2, 3, 4, 5}; - vector def_levels = {1, 0, 0, 1, 1, 0, 0, 0, 1, 1}; - - std::shared_ptr page = MakeDataPage(values, def_levels, 1, - {}, 0); - - pages_.push_back(page); - - NodePtr type = schema::Int32("a", Repetition::OPTIONAL); - ColumnDescriptor descr(type, 1, 0); - InitReader(&descr); - - Int32Reader* reader = static_cast(reader_.get()); - - size_t values_read = 0; - size_t batch_actual = 0; - - vector vresult(3, -1); - vector dresult(5, -1); - - batch_actual = reader->ReadBatch(5, &dresult[0], nullptr, - &vresult[0], &values_read); - ASSERT_EQ(5, batch_actual); - ASSERT_EQ(3, values_read); - - ASSERT_TRUE(vector_equal(vresult, slice(values, 0, 3))); - ASSERT_TRUE(vector_equal(dresult, slice(def_levels, 0, 5))); - - batch_actual = reader->ReadBatch(5, &dresult[0], nullptr, - &vresult[0], &values_read); - ASSERT_EQ(5, batch_actual); - ASSERT_EQ(2, values_read); - - ASSERT_TRUE(vector_equal(slice(vresult, 0, 2), slice(values, 3, 5))); - ASSERT_TRUE(vector_equal(dresult, slice(def_levels, 5, 10))); - - // EOS, pass all nullptrs to check for improper writes. Do not segfault / - // core dump - batch_actual = reader->ReadBatch(5, nullptr, nullptr, - nullptr, &values_read); - ASSERT_EQ(0, batch_actual); - ASSERT_EQ(0, values_read); + int levels_per_page = 100; + int num_pages = 50; + max_def_level_ = 4; + max_rep_level_ = 0; + NodePtr type = schema::Int32("b", Repetition::OPTIONAL); + const ColumnDescriptor descr(type, max_def_level_, max_rep_level_); + execute(num_pages, levels_per_page, &descr); } TEST_F(TestPrimitiveReader, TestInt32FlatRepeated) { - vector values = {1, 2, 3, 4, 5}; - vector def_levels = {2, 1, 1, 2, 2, 1, 1, 2, 2, 1}; - vector rep_levels = {0, 1, 1, 0, 0, 1, 1, 0, 0, 1}; - - std::shared_ptr page = MakeDataPage(values, - def_levels, 2, rep_levels, 1); - - pages_.push_back(page); - - NodePtr type = schema::Int32("a", Repetition::REPEATED); - ColumnDescriptor descr(type, 2, 1); - InitReader(&descr); - - Int32Reader* reader = static_cast(reader_.get()); - - size_t values_read = 0; - size_t batch_actual = 0; - - vector vresult(3, -1); - vector dresult(5, -1); - vector rresult(5, -1); - - batch_actual = reader->ReadBatch(5, &dresult[0], &rresult[0], - &vresult[0], &values_read); - ASSERT_EQ(5, batch_actual); - ASSERT_EQ(3, values_read); - - ASSERT_TRUE(vector_equal(vresult, slice(values, 0, 3))); - ASSERT_TRUE(vector_equal(dresult, slice(def_levels, 0, 5))); - ASSERT_TRUE(vector_equal(rresult, slice(rep_levels, 0, 5))); - - batch_actual = reader->ReadBatch(5, &dresult[0], &rresult[0], - &vresult[0], &values_read); - ASSERT_EQ(5, batch_actual); - ASSERT_EQ(2, values_read); - - ASSERT_TRUE(vector_equal(slice(vresult, 0, 2), slice(values, 3, 5))); - ASSERT_TRUE(vector_equal(dresult, slice(def_levels, 5, 10))); - ASSERT_TRUE(vector_equal(rresult, slice(rep_levels, 5, 10))); - - // EOS, pass all nullptrs to check for improper writes. Do not segfault / - // core dump - batch_actual = reader->ReadBatch(5, nullptr, nullptr, - nullptr, &values_read); - ASSERT_EQ(0, batch_actual); - ASSERT_EQ(0, values_read); + int levels_per_page = 100; + int num_pages = 50; + max_def_level_ = 4; + max_rep_level_ = 2; + NodePtr type = schema::Int32("c", Repetition::REPEATED); + const ColumnDescriptor descr(type, max_def_level_, max_rep_level_); + execute(num_pages, levels_per_page, &descr); } -TEST_F(TestPrimitiveReader, TestInt32FlatRepeatedMultiplePages) { - vector values[2] = {{1, 2, 3, 4, 5}, - {6, 7, 8, 9, 10}}; - vector def_levels[2] = {{2, 1, 1, 2, 2, 1, 1, 2, 2, 1}, - {2, 2, 1, 2, 1, 1, 2, 1, 2, 1}}; - vector rep_levels[2] = {{0, 1, 1, 0, 0, 1, 1, 0, 0, 1}, - {0, 0, 1, 0, 1, 1, 0, 1, 0, 1}}; - - std::shared_ptr page; - - for (int i = 0; i < 4; i++) { - page = MakeDataPage(values[i % 2], - def_levels[i % 2], 2, rep_levels[i % 2], 1); - pages_.push_back(page); - } - - NodePtr type = schema::Int32("a", Repetition::REPEATED); - ColumnDescriptor descr(type, 2, 1); - InitReader(&descr); - - Int32Reader* reader = static_cast(reader_.get()); - - size_t values_read = 0; - size_t batch_actual = 0; - - vector vresult(3, -1); - vector dresult(5, -1); - vector rresult(5, -1); - - for (int i = 0; i < 4; i++) { - batch_actual = reader->ReadBatch(5, &dresult[0], &rresult[0], - &vresult[0], &values_read); - ASSERT_EQ(5, batch_actual); - ASSERT_EQ(3, values_read); - - ASSERT_TRUE(vector_equal(vresult, slice(values[i % 2], 0, 3))); - ASSERT_TRUE(vector_equal(dresult, slice(def_levels[i % 2], 0, 5))); - ASSERT_TRUE(vector_equal(rresult, slice(rep_levels[i % 2], 0, 5))); - - batch_actual = reader->ReadBatch(5, &dresult[0], &rresult[0], - &vresult[0], &values_read); - ASSERT_EQ(5, batch_actual); - ASSERT_EQ(2, values_read); - - ASSERT_TRUE(vector_equal(slice(vresult, 0, 2), slice(values[i % 2], 3, 5))); - ASSERT_TRUE(vector_equal(dresult, slice(def_levels[i % 2], 5, 10))); - ASSERT_TRUE(vector_equal(rresult, slice(rep_levels[i % 2], 5, 10))); - } - // EOS, pass all nullptrs to check for improper writes. Do not segfault / - // core dump - batch_actual = reader->ReadBatch(5, nullptr, nullptr, - nullptr, &values_read); - ASSERT_EQ(0, batch_actual); - ASSERT_EQ(0, values_read); -} } // namespace test } // namespace parquet_cpp diff --git a/src/parquet/column/reader.h b/src/parquet/column/reader.h index d11a13c84..dc23dd915 100644 --- a/src/parquet/column/reader.h +++ b/src/parquet/column/reader.h @@ -122,7 +122,7 @@ class TypedColumnReader : public ColumnReader { // This API is the same for both V1 and V2 of the DataPage // // @returns: actual number of levels read (see values_read for number of values read) - size_t ReadBatch(int batch_size, int16_t* def_levels, int16_t* rep_levels, + size_t ReadBatch(int32_t batch_size, int16_t* def_levels, int16_t* rep_levels, T* values, size_t* values_read); private: diff --git a/src/parquet/column/scanner-test.cc b/src/parquet/column/scanner-test.cc new file mode 100644 index 000000000..be6b42ee2 --- /dev/null +++ b/src/parquet/column/scanner-test.cc @@ -0,0 +1,268 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include +#include +#include +#include +#include +#include + +#include "parquet/types.h" +#include "parquet/column/page.h" +#include "parquet/column/scanner.h" +#include "parquet/column/test-util.h" +#include "parquet/schema/descriptor.h" +#include "parquet/schema/types.h" +#include "parquet/util/test-common.h" + +using std::string; +using std::vector; +using std::shared_ptr; + +namespace parquet_cpp { + +using schema::NodePtr; + +bool operator==(const Int96& a, const Int96& b) { + return a.value[0] == b.value[0] && + a.value[1] == b.value[1] && + a.value[2] == b.value[2]; +} + +bool operator==(const ByteArray& a, const ByteArray& b) { + return a.len == b.len && 0 == memcmp(a.ptr, b.ptr, a.len); +} + +static int FLBA_LENGTH = 12; +bool operator==(const FixedLenByteArray& a, const FixedLenByteArray& b) { + return 0 == memcmp(a.ptr, b.ptr, FLBA_LENGTH); +} + +namespace test { + +template class TypeValue { + public: + static const int value = N; +}; +template const int TypeValue::value; + +template +class TestFlatScanner : public ::testing::Test { + public: + typedef typename type_traits::value_type T; + + void InitValues() { + random_numbers(num_values_, 0, std::numeric_limits::min(), + std::numeric_limits::max(), values_.data()); + } + + void MakePages(const ColumnDescriptor *d, int num_pages, int levels_per_page) { + num_levels_ = levels_per_page * num_pages; + num_values_ = 0; + uint32_t seed = 0; + int16_t zero = 0; + int16_t max_def_level = d->max_definition_level(); + int16_t max_rep_level = d->max_repetition_level(); + vector values_per_page(num_pages, levels_per_page); + // Create definition levels + if (max_def_level > 0) { + def_levels_.resize(num_levels_); + random_numbers(num_levels_, seed, zero, max_def_level, def_levels_.data()); + for (int p = 0; p < num_pages; p++) { + int num_values_per_page = 0; + for (int i = 0; i < levels_per_page; i++) { + if (def_levels_[i + p * levels_per_page] == max_def_level) { + num_values_per_page++; + num_values_++; + } + } + values_per_page[p] = num_values_per_page; + } + } else { + num_values_ = num_levels_; + } + // Create repitition levels + if (max_rep_level > 0) { + rep_levels_.resize(num_levels_); + random_numbers(num_levels_, seed, zero, max_rep_level, rep_levels_.data()); + } + // Create values + values_.resize(num_values_); + InitValues(); + Paginate(d, values_, def_levels_, max_def_level, + rep_levels_, max_rep_level, levels_per_page, values_per_page, pages_); + } + + void InitScanner(const ColumnDescriptor *d) { + std::unique_ptr pager(new test::MockPageReader(pages_)); + scanner_ = Scanner::Make(ColumnReader::Make(d, std::move(pager))); + } + + void CheckResults(int batch_size, const ColumnDescriptor *d) { + TypedScanner* scanner = + reinterpret_cast* >(scanner_.get()); + T val; + bool is_null; + int16_t def_level; + int16_t rep_level; + size_t j = 0; + scanner->SetBatchSize(batch_size); + for (size_t i = 0; i < num_levels_; i++) { + ASSERT_TRUE(scanner->Next(&val, &def_level, &rep_level, &is_null)) << i << j; + if (!is_null) { + ASSERT_EQ(values_[j++], val) << i <<"V"<< j; + } + if (!d->is_required()) { + ASSERT_EQ(def_levels_[i], def_level) << i <<"D"<< j; + } + if (d->is_repeated()) { + ASSERT_EQ(rep_levels_[i], rep_level) << i <<"R"<< j; + } + } + ASSERT_EQ(num_values_, j); + ASSERT_FALSE(scanner->HasNext()); + } + + void Clear() { + pages_.clear(); + values_.clear(); + def_levels_.clear(); + rep_levels_.clear(); + } + + void Execute(int num_pages, int levels_page, int batch_size, + const ColumnDescriptor *d) { + MakePages(d, num_pages, levels_page); + InitScanner(d); + CheckResults(batch_size, d); + Clear(); + } + + void InitDescriptors(std::shared_ptr& d1, + std::shared_ptr& d2, std::shared_ptr& d3) { + NodePtr type; + type = schema::PrimitiveNode::Make("c1", Repetition::REQUIRED, + static_cast(TYPE::value)); + d1.reset(new ColumnDescriptor(type, 0, 0)); + type = schema::PrimitiveNode::Make("c2", Repetition::OPTIONAL, + static_cast(TYPE::value)); + d2.reset(new ColumnDescriptor(type, 4, 0)); + type = schema::PrimitiveNode::Make("c3", Repetition::REPEATED, + static_cast(TYPE::value)); + d3.reset(new ColumnDescriptor(type, 4, 2)); + } + + void ExecuteAll(int num_pages, int num_levels, int batch_size) { + std::shared_ptr d1; + std::shared_ptr d2; + std::shared_ptr d3; + InitDescriptors(d1, d2, d3); + // evaluate REQUIRED pages + Execute(num_pages, num_levels, batch_size, d1.get()); + // evaluate OPTIONAL pages + Execute(num_pages, num_levels, batch_size, d2.get()); + // evaluate REPEATED pages + Execute(num_pages, num_levels, batch_size, d3.get()); + } + + protected: + int num_levels_; + int num_values_; + vector > pages_; + std::shared_ptr scanner_; + vector values_; + vector def_levels_; + vector rep_levels_; + vector data_buffer_; // For BA and FLBA +}; + +template<> +void TestFlatScanner >::InitValues() { + values_ = flip_coins(num_values_, 0); +} + +template<> +void TestFlatScanner >::InitValues() { + random_Int96_numbers(num_values_, 0, std::numeric_limits::min(), + std::numeric_limits::max(), values_.data()); +} + +template<> +void TestFlatScanner >::InitValues() { + int max_byte_array_len = 12; + int num_bytes = max_byte_array_len + sizeof(uint32_t); + size_t nbytes = num_values_ * num_bytes; + data_buffer_.resize(nbytes); + random_byte_array(num_values_, 0, data_buffer_.data(), values_.data(), + max_byte_array_len); +} + +template<> +void TestFlatScanner >::InitValues() { + size_t nbytes = num_values_ * FLBA_LENGTH; + data_buffer_.resize(nbytes); + random_fixed_byte_array(num_values_, 0, data_buffer_.data(), FLBA_LENGTH, + values_.data()); +} + +template<> +void TestFlatScanner >::InitDescriptors( + std::shared_ptr& d1, std::shared_ptr& d2, + std::shared_ptr& d3) { + NodePtr type = schema::PrimitiveNode::MakeFLBA("c1", Repetition::REQUIRED, + FLBA_LENGTH, LogicalType::UTF8); + d1.reset(new ColumnDescriptor(type, 0, 0)); + type = schema::PrimitiveNode::MakeFLBA("c2", Repetition::OPTIONAL, + FLBA_LENGTH, LogicalType::UTF8); + d2.reset(new ColumnDescriptor(type, 4, 0)); + type = schema::PrimitiveNode::MakeFLBA("c3", Repetition::REPEATED, + FLBA_LENGTH, LogicalType::UTF8); + d3.reset(new ColumnDescriptor(type, 4, 2)); +} + +typedef TestFlatScanner> TestFlatFLBAScanner; + +static int num_levels_per_page = 100; +static int num_pages = 20; +static int batch_size = 32; + +typedef ::testing::Types, TypeValue, + TypeValue, TypeValue, TypeValue, + TypeValue, TypeValue, + TypeValue > Primitives; + +TYPED_TEST_CASE(TestFlatScanner, Primitives); + +TYPED_TEST(TestFlatScanner, TestScanner) { + this->ExecuteAll(num_pages, num_levels_per_page, batch_size); +} + +//PARQUET 502 +TEST_F(TestFlatFLBAScanner, TestSmallBatch) { + NodePtr type = schema::PrimitiveNode::MakeFLBA("c1", Repetition::REQUIRED, + FLBA_LENGTH, LogicalType::UTF8); + const ColumnDescriptor d(type, 0, 0); + MakePages(&d, 1, 100); + InitScanner(&d); + CheckResults(1, &d); +} + +} // namespace test +} // namespace parquet_cpp diff --git a/src/parquet/column/scanner.h b/src/parquet/column/scanner.h index 512f540c7..f3f5719c1 100644 --- a/src/parquet/column/scanner.h +++ b/src/parquet/column/scanner.h @@ -45,8 +45,8 @@ class Scanner { values_buffered_(0), reader_(reader) { // TODO: don't allocate for required fields - def_levels_.resize(batch_size_); - rep_levels_.resize(batch_size_); + def_levels_.resize(reader->descr()->is_optional() ? batch_size_ : 0); + rep_levels_.resize(reader->descr()->is_repeated() ? batch_size_ : 0); } virtual ~Scanner() {} @@ -57,7 +57,7 @@ class Scanner { virtual void PrintNext(std::ostream& out, int width) = 0; bool HasNext() { - return value_offset_ < values_buffered_ || reader_->HasNext(); + return level_offset_ < levels_buffered_ || reader_->HasNext(); } const ColumnDescriptor* descr() const { @@ -108,21 +108,45 @@ class TypedScanner : public Scanner { levels_buffered_ = typed_reader_->ReadBatch(batch_size_, &def_levels_[0], &rep_levels_[0], values_, &values_buffered_); - // TODO: repetition levels - + value_offset_ = 0; level_offset_ = 0; if (!levels_buffered_) { return false; } } - *def_level = def_levels_[level_offset_++]; - *rep_level = 1; + *def_level = descr()->is_optional() ? + def_levels_[level_offset_] : descr()->max_definition_level(); + *rep_level = descr()->is_repeated() ? + rep_levels_[level_offset_] : descr()->max_repetition_level(); + level_offset_++; + return true; + } + + bool Next(T* val, int16_t* def_level, int16_t* rep_level, bool* is_null) { + if (level_offset_ == levels_buffered_) { + if (!HasNext()) { + // Out of data pages + return false; + } + } + + NextLevels(def_level, rep_level); + *is_null = *def_level < descr()->max_definition_level(); + + if (*is_null) { + return true; + } + + if (value_offset_ == values_buffered_) { + throw ParquetException("Value was non-null, but has not been buffered"); + } + *val = values_[value_offset_++]; return true; } // Returns true if there is a next value bool NextValue(T* val, bool* is_null) { - if (value_offset_ == values_buffered_) { + if (level_offset_ == levels_buffered_) { if (!HasNext()) { // Out of data pages return false; @@ -133,7 +157,7 @@ class TypedScanner : public Scanner { int16_t def_level; int16_t rep_level; NextLevels(&def_level, &rep_level); - *is_null = def_level < rep_level; + *is_null = def_level < descr()->max_definition_level(); if (*is_null) { return true; diff --git a/src/parquet/column/test-util.h b/src/parquet/column/test-util.h index 99f56b188..1854ebb2e 100644 --- a/src/parquet/column/test-util.h +++ b/src/parquet/column/test-util.h @@ -33,6 +33,7 @@ // Depended on by SerializedPageReader test utilities for now #include "parquet/encodings/plain-encoding.h" #include "parquet/util/input.h" +#include "parquet/util/test-common.h" namespace parquet_cpp { @@ -96,14 +97,14 @@ class DataPageBuilder { have_rep_levels_ = true; } - void AppendValues(const std::vector& values, + void AppendValues(const ColumnDescriptor *d, const std::vector& values, Encoding::type encoding = Encoding::PLAIN) { if (encoding != Encoding::PLAIN) { ParquetException::NYI("only plain encoding currently implemented"); } size_t bytes_to_encode = values.size() * sizeof(T); - PlainEncoder encoder(nullptr); + PlainEncoder encoder(d); encoder.Encode(&values[0], values.size(), sink_); num_values_ = std::max(static_cast(values.size()), num_values_); @@ -164,8 +165,25 @@ class DataPageBuilder { } }; +template<> +void DataPageBuilder::AppendValues(const ColumnDescriptor *d, + const std::vector& values, Encoding::type encoding) { + if (encoding != Encoding::PLAIN) { + ParquetException::NYI("only plain encoding currently implemented"); + } + size_t bytes_to_encode = values.size() * sizeof(bool); + + PlainEncoder encoder(d); + encoder.Encode(values, values.size(), sink_); + + num_values_ = std::max(static_cast(values.size()), num_values_); + encoding_ = encoding; + have_values_ = true; +} + template -static std::shared_ptr MakeDataPage(const std::vector& values, +static std::shared_ptr MakeDataPage(const ColumnDescriptor *d, + const std::vector& values, const std::vector& def_levels, int16_t max_def_level, const std::vector& rep_levels, int16_t max_rep_level) { size_t num_values = values.size(); @@ -181,7 +199,7 @@ static std::shared_ptr MakeDataPage(const std::vector& values, page_builder.AppendDefLevels(def_levels, max_def_level); } - page_builder.AppendValues(values); + page_builder.AppendValues(d, values); auto buffer = page_stream.GetBuffer(); @@ -191,6 +209,37 @@ static std::shared_ptr MakeDataPage(const std::vector& values, page_builder.rep_level_encoding()); } +template +static void Paginate(const ColumnDescriptor *d, + const std::vector& values, + const std::vector& def_levels, int16_t max_def_level, + const std::vector& rep_levels, int16_t max_rep_level, + int num_levels_per_page, const std::vector& values_per_page, + std::vector >& pages) { + int num_pages = values_per_page.size(); + int def_level_start = 0; + int def_level_end = 0; + int rep_level_start = 0; + int rep_level_end = 0; + int value_start = 0; + for (int i = 0; i < num_pages; i++) { + if (max_def_level > 0) { + def_level_start = i * num_levels_per_page; + def_level_end = (i + 1) * num_levels_per_page; + } + if (max_rep_level > 0) { + rep_level_start = i * num_levels_per_page; + rep_level_end = (i + 1) * num_levels_per_page; + } + std::shared_ptr page = MakeDataPage(d, + slice(values, value_start, value_start + values_per_page[i]), + slice(def_levels, def_level_start, def_level_end), max_def_level, + slice(rep_levels, rep_level_start, rep_level_end), max_rep_level); + pages.push_back(page); + value_start += values_per_page[i]; + } +} + } // namespace test } // namespace parquet_cpp diff --git a/src/parquet/encodings/plain-encoding-test.cc b/src/parquet/encodings/plain-encoding-test.cc index 04eb907ee..7ebd21fef 100644 --- a/src/parquet/encodings/plain-encoding-test.cc +++ b/src/parquet/encodings/plain-encoding-test.cc @@ -81,7 +81,8 @@ class EncodeDecode{ void generate_data() { // seed the prng so failure is deterministic - random_numbers(num_values_, 0, draws_); + random_numbers(num_values_, 0, std::numeric_limits::min(), + std::numeric_limits::max(), draws_); } void encode_decode(ColumnDescriptor *d) { @@ -129,6 +130,13 @@ void EncodeDecode::generate_data() { random_bools(num_values_, 0.5, 0, draws_); } +template<> +void EncodeDecode::generate_data() { + // seed the prng so failure is deterministic + random_Int96_numbers(num_values_, 0, std::numeric_limits::min(), + std::numeric_limits::max(), draws_); +} + template<> void EncodeDecode::verify_results() { for (size_t i = 0; i < num_values_; ++i) { @@ -141,8 +149,9 @@ void EncodeDecode::verify_results() { template<> void EncodeDecode::generate_data() { // seed the prng so failure is deterministic - int max_byte_array_len = 12 + sizeof(uint32_t); - size_t nbytes = num_values_ * max_byte_array_len; + int max_byte_array_len = 12; + int num_bytes = max_byte_array_len + sizeof(uint32_t); + size_t nbytes = num_values_ * num_bytes; data_buffer_.resize(nbytes); random_byte_array(num_values_, 0, data_buffer_.data(), draws_, max_byte_array_len); @@ -168,7 +177,7 @@ void EncodeDecode::generate_data() { template<> void EncodeDecode::verify_results() { - for (size_t i = 0; i < 1000; ++i) { + for (size_t i = 0; i < num_values_; ++i) { ASSERT_EQ(0, memcmp(draws_[i].ptr, decode_buf_[i].ptr, flba_length)) << i; } } @@ -213,7 +222,7 @@ TEST(BAEncodeDecode, TestEncodeDecode) { TEST(FLBAEncodeDecode, TestEncodeDecode) { schema::NodePtr node; node = schema::PrimitiveNode::MakeFLBA("name", Repetition::OPTIONAL, - Type::FIXED_LEN_BYTE_ARRAY, flba_length, LogicalType::UTF8); + flba_length, LogicalType::UTF8); ColumnDescriptor d(node, 0, 0); EncodeDecode obj; obj.execute(num_values, &d); diff --git a/src/parquet/schema/descriptor.h b/src/parquet/schema/descriptor.h index 7991deaed..62066efa0 100644 --- a/src/parquet/schema/descriptor.h +++ b/src/parquet/schema/descriptor.h @@ -58,6 +58,18 @@ class ColumnDescriptor { return primitive_node_->name(); } + bool is_required() const { + return max_definition_level_ == 0; + } + + bool is_optional() const { + return max_definition_level_ > 0; + } + + bool is_repeated() const { + return max_repetition_level_ > 0; + } + int type_length() const; private: diff --git a/src/parquet/schema/schema-descriptor-test.cc b/src/parquet/schema/schema-descriptor-test.cc index 83b136d49..767761561 100644 --- a/src/parquet/schema/schema-descriptor-test.cc +++ b/src/parquet/schema/schema-descriptor-test.cc @@ -51,7 +51,7 @@ TEST(TestColumnDescriptor, TestAttrs) { // Test FIXED_LEN_BYTE_ARRAY node = PrimitiveNode::MakeFLBA("name", Repetition::OPTIONAL, - Type::FIXED_LEN_BYTE_ARRAY, 12, LogicalType::UTF8); + 12, LogicalType::UTF8); descr = ColumnDescriptor(node, 4, 1); ASSERT_EQ(Type::FIXED_LEN_BYTE_ARRAY, descr.physical_type()); diff --git a/src/parquet/schema/types.h b/src/parquet/schema/types.h index 83b9fd2c4..e76323f90 100644 --- a/src/parquet/schema/types.h +++ b/src/parquet/schema/types.h @@ -183,10 +183,9 @@ class PrimitiveNode : public Node { // Alternate constructor for FIXED_LEN_BYTE_ARRAY (FLBA) static inline NodePtr MakeFLBA(const std::string& name, - Repetition::type repetition, Type::type type, - int32_t type_length, + Repetition::type repetition, int32_t type_length, LogicalType::type logical_type = LogicalType::NONE) { - NodePtr result = Make(name, repetition, type, logical_type); + NodePtr result = Make(name, repetition, Type::FIXED_LEN_BYTE_ARRAY, logical_type); static_cast(result.get())->SetTypeLength(type_length); return result; } diff --git a/src/parquet/util/test-common.h b/src/parquet/util/test-common.h index d8961d19e..49c4131e5 100644 --- a/src/parquet/util/test-common.h +++ b/src/parquet/util/test-common.h @@ -106,49 +106,46 @@ void random_bytes(int n, uint32_t seed, std::vector* out) { } } -template -void random_numbers(int n, uint32_t seed, T* out) { +void random_bools(int n, double p, uint32_t seed, bool* out) { std::mt19937 gen(seed); - std::uniform_real_distribution d(std::numeric_limits::lowest(), - std::numeric_limits::max()); + std::bernoulli_distribution d(p); for (int i = 0; i < n; ++i) { out[i] = d(gen); } } -void random_bools(int n, double p, uint32_t seed, bool* out) { +template +void random_numbers(int n, uint32_t seed, T min_value, T max_value, T* out) { std::mt19937 gen(seed); - std::bernoulli_distribution d(p); + std::uniform_int_distribution d(min_value, max_value); for (int i = 0; i < n; ++i) { out[i] = d(gen); } } template <> -void random_numbers(int n, uint32_t seed, int32_t* out) { +void random_numbers(int n, uint32_t seed, float min_value, float max_value, float* out) { std::mt19937 gen(seed); - std::uniform_int_distribution d(std::numeric_limits::lowest(), - std::numeric_limits::max()); + std::uniform_real_distribution d(min_value, max_value); for (int i = 0; i < n; ++i) { out[i] = d(gen); } } template <> -void random_numbers(int n, uint32_t seed, int64_t* out) { +void random_numbers(int n, uint32_t seed, double min_value, double max_value, + double* out) { std::mt19937 gen(seed); - std::uniform_int_distribution d(std::numeric_limits::lowest(), - std::numeric_limits::max()); + std::uniform_real_distribution d(min_value, max_value); for (int i = 0; i < n; ++i) { out[i] = d(gen); } } -template <> -void random_numbers(int n, uint32_t seed, Int96* out) { +void random_Int96_numbers(int n, uint32_t seed, int32_t min_value, int32_t max_value, + Int96* out) { std::mt19937 gen(seed); - std::uniform_int_distribution d(std::numeric_limits::lowest(), - std::numeric_limits::max()); + std::uniform_int_distribution d(min_value, max_value); for (int i = 0; i < n; ++i) { out[i].value[0] = d(gen); out[i].value[1] = d(gen); @@ -183,6 +180,7 @@ void random_byte_array(int n, uint32_t seed, uint8_t *buf, buf += out[i].len; } } + } // namespace test } // namespace parquet_cpp