diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index e1e409d0a7d9f..23717a442b185 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -140,6 +140,7 @@ set(ARROW_SRCS array/array_binary.cc array/array_decimal.cc array/array_dict.cc + array/array_encoded.cc array/array_nested.cc array/array_primitive.cc array/builder_adaptive.cc @@ -217,6 +218,7 @@ set(ARROW_SRCS util/key_value_metadata.cc util/memory.cc util/mutex.cc + util/rle_util.cc util/string.cc util/string_builder.cc util/task_group.cc @@ -715,6 +717,7 @@ add_arrow_test(array_test array/array_test.cc array/array_binary_test.cc array/array_dict_test.cc + array/array_encoded_test.cc array/array_list_test.cc array/array_struct_test.cc array/array_union_test.cc diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h index 918c7617446e3..7ea95d5e3bf99 100644 --- a/cpp/src/arrow/array.h +++ b/cpp/src/arrow/array.h @@ -34,10 +34,15 @@ /// @{ /// @} +/// \defgroup encoded-arrays Concrete classes for encoded arrays +/// @{ +/// @} + #include "arrow/array/array_base.h" // IWYU pragma: keep #include "arrow/array/array_binary.h" // IWYU pragma: keep #include "arrow/array/array_decimal.h" // IWYU pragma: keep #include "arrow/array/array_dict.h" // IWYU pragma: keep +#include "arrow/array/array_encoded.h" // IWYU pragma: keep #include "arrow/array/array_nested.h" // IWYU pragma: keep #include "arrow/array/array_primitive.h" // IWYU pragma: keep #include "arrow/array/data.h" // IWYU pragma: keep diff --git a/cpp/src/arrow/array/array_base.cc b/cpp/src/arrow/array/array_base.cc index 5d27b2aedfb47..aa895ab88333c 100644 --- a/cpp/src/arrow/array/array_base.cc +++ b/cpp/src/arrow/array/array_base.cc @@ -143,6 +143,10 @@ struct ScalarFromArraySlotImpl { return Status::OK(); } + Status Visit(const RunLengthEncodedArray& a) { + return Status::NotImplemented("Creating scalar from encoded array"); + } + Status Visit(const ExtensionArray& a) { ARROW_ASSIGN_OR_RAISE(auto storage, a.storage()->GetScalar(index_)); out_ = std::make_shared(std::move(storage), a.type()); diff --git a/cpp/src/arrow/array/array_encoded.cc b/cpp/src/arrow/array/array_encoded.cc new file mode 100644 index 0000000000000..3dbada8c87037 --- /dev/null +++ b/cpp/src/arrow/array/array_encoded.cc @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/array/array_encoded.h" +#include "arrow/array/util.h" +#include "arrow/util/logging.h" +#include "arrow/util/rle_util.h" + +namespace arrow { + +// ---------------------------------------------------------------------- +// RunLengthEncodedArray + +RunLengthEncodedArray::RunLengthEncodedArray(const std::shared_ptr& data) { + ARROW_CHECK_EQ(data->type->id(), Type::RUN_LENGTH_ENCODED); + SetData(data); +} + +RunLengthEncodedArray::RunLengthEncodedArray(const std::shared_ptr& type, + int64_t length, + const std::shared_ptr& run_ends_array, + const std::shared_ptr& values_array, + int64_t offset) { + ARROW_CHECK_EQ(type->id(), Type::RUN_LENGTH_ENCODED); + SetData(ArrayData::Make(type, length, {NULLPTR}, 0, offset)); + data_->child_data.push_back(std::move(run_ends_array->data())); + data_->child_data.push_back(std::move(values_array->data())); +} + +Result> RunLengthEncodedArray::Make( + const std::shared_ptr& run_ends_array, + const std::shared_ptr& values_array, int64_t logical_length, int64_t offset) { + if (!RunLengthEncodedType::RunEndsTypeValid(*run_ends_array->type())) { + return Status::Invalid("Run ends array must be int16, int32 or int64 type"); + } + if (run_ends_array->null_count() != 0) { + return Status::Invalid("Run ends array cannot contain null values"); + } + + return std::make_shared( + run_length_encoded(run_ends_array->type(), values_array->type()), logical_length, + run_ends_array, values_array, offset); +} + +std::shared_ptr RunLengthEncodedArray::values_array() const { + return MakeArray(data()->child_data[1]); +} + +std::shared_ptr RunLengthEncodedArray::run_ends_array() const { + return MakeArray(data()->child_data[0]); +} + +int64_t RunLengthEncodedArray::GetPhysicalOffset() const { + const ArraySpan span(*this->data_); + return rle_util::GetPhysicalOffset(span); +} + +int64_t RunLengthEncodedArray::GetPhysicalLength() const { + const ArraySpan span(*this->data_); + return rle_util::GetPhysicalLength(span); +} + +} // namespace arrow diff --git a/cpp/src/arrow/array/array_encoded.h b/cpp/src/arrow/array/array_encoded.h new file mode 100644 index 0000000000000..b7cc3479a2fab --- /dev/null +++ b/cpp/src/arrow/array/array_encoded.h @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Array accessor classes run-length encoded arrays + +#pragma once + +#include +#include +#include +#include +#include + +#include "arrow/array/array_base.h" +#include "arrow/array/data.h" +#include "arrow/result.h" +#include "arrow/status.h" +#include "arrow/type.h" +#include "arrow/type_fwd.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/macros.h" +#include "arrow/util/visibility.h" + +namespace arrow { + +/// \addtogroup encoded-arrays +/// +/// @{ + +// ---------------------------------------------------------------------- +// RunLengthEncoded + +/// Concrete Array class for run-length encoded data +class ARROW_EXPORT RunLengthEncodedArray : public Array { + public: + using TypeClass = RunLengthEncodedType; + + explicit RunLengthEncodedArray(const std::shared_ptr& data); + + RunLengthEncodedArray(const std::shared_ptr& type, int64_t length, + const std::shared_ptr& run_ends_array, + const std::shared_ptr& values_array, int64_t offset = 0); + + /// \brief Construct a RunLengthEncodedArray from values and run ends arrays + /// + /// The data type is automatically inferred from the arguments. + /// The run_ends_array and values_array must be the same length. + static Result> Make( + const std::shared_ptr& run_ends_array, + const std::shared_ptr& values_array, int64_t logical_length, + int64_t offset = 0); + + /// \brief Returns an array holding the values of each run. This function does apply the + /// physical offset to the array + std::shared_ptr values_array() const; + + /// \brief Returns an array holding the logical indexes of each run end. This function + /// does apply the physical offset to the array + std::shared_ptr run_ends_array() const; + + /// \brief Get the physical offset of the RLE array. Warning: calling this may result in + /// in an O(log(N)) binary search on the run ends buffer + int64_t GetPhysicalOffset() const; + + /// \brief Get the physical offset of the RLE array. Avoid calling this method in a + /// context where you can easily calculate the value yourself. Calling this can result + /// in an O(log(N)) binary search on the run ends buffer + int64_t GetPhysicalLength() const; +}; + +/// @} + +} // namespace arrow diff --git a/cpp/src/arrow/array/array_encoded_test.cc b/cpp/src/arrow/array/array_encoded_test.cc new file mode 100644 index 0000000000000..04882bb06eaeb --- /dev/null +++ b/cpp/src/arrow/array/array_encoded_test.cc @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include +#include +#include +#include + +#include "arrow/array.h" +#include "arrow/array/builder_nested.h" +#include "arrow/chunked_array.h" +#include "arrow/status.h" +#include "arrow/testing/builder.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/type.h" +#include "arrow/util/checked_cast.h" + +namespace arrow { + +using internal::checked_cast; + +// ---------------------------------------------------------------------- +// Run-length encoded array tests + +namespace { + +class TestRunLengthEncodedArray + : public ::testing::TestWithParam> { + protected: + std::shared_ptr run_ends_type; + std::shared_ptr string_values; + std::shared_ptr int32_values; + std::shared_ptr int16_values; + std::shared_ptr size_values; + std::shared_ptr size_only_null; + + void SetUp() override { + run_ends_type = GetParam(); + std::shared_ptr run_ends_type = GetParam(); + + string_values = ArrayFromJSON(utf8(), R"(["Hello", "World", null])"); + int32_values = ArrayFromJSON(int32(), "[10, 20, 30]"); + int16_values = ArrayFromJSON(int16(), "[10, 20, 30]"); + size_values = ArrayFromJSON(run_ends_type, "[10, 20, 30]"); + size_only_null = ArrayFromJSON(run_ends_type, "[null, null, null]"); + } +}; + +TEST_P(TestRunLengthEncodedArray, MakeArray) { + ASSERT_OK_AND_ASSIGN(auto rle_array, + RunLengthEncodedArray::Make(int32_values, string_values, 3)); + auto array_data = rle_array->data(); + auto new_array = MakeArray(array_data); + ASSERT_ARRAYS_EQUAL(*new_array, *rle_array); + // should be the exact same ArrayData object + ASSERT_EQ(new_array->data(), array_data); + ASSERT_NE(std::dynamic_pointer_cast(new_array), NULLPTR); +} + +TEST_P(TestRunLengthEncodedArray, FromRunEndsAndValues) { + std::shared_ptr rle_array; + + ASSERT_OK_AND_ASSIGN(rle_array, + RunLengthEncodedArray::Make(size_values, int32_values, 3)); + ASSERT_EQ(rle_array->length(), 3); + ASSERT_ARRAYS_EQUAL(*rle_array->values_array(), *int32_values); + ASSERT_ARRAYS_EQUAL(*rle_array->run_ends_array(), *size_values); + ASSERT_EQ(rle_array->offset(), 0); + ASSERT_EQ(rle_array->data()->null_count, 0); + // one dummy buffer, since code may assume there is exactly one buffer + ASSERT_EQ(rle_array->data()->buffers.size(), 1); + + // explicitly passing offset + ASSERT_OK_AND_ASSIGN(rle_array, + RunLengthEncodedArray::Make(size_values, string_values, 2, 1)); + ASSERT_EQ(rle_array->length(), 2); + ASSERT_ARRAYS_EQUAL(*rle_array->values_array(), *string_values); + ASSERT_ARRAYS_EQUAL(*rle_array->run_ends_array(), *size_values); + ASSERT_EQ(rle_array->offset(), 1); + // explicitly access null count variable so it is not calculated automatically + ASSERT_EQ(rle_array->data()->null_count, 0); + + ASSERT_RAISES_WITH_MESSAGE(Invalid, + "Invalid: Run ends array must be int16, int32 or int64 type", + RunLengthEncodedArray::Make(string_values, int32_values, 3)); + ASSERT_RAISES_WITH_MESSAGE( + Invalid, "Invalid: Run ends array cannot contain null values", + RunLengthEncodedArray::Make(size_only_null, int32_values, 3)); +} + +TEST_P(TestRunLengthEncodedArray, OffsetLength) { + auto run_ends = ArrayFromJSON(run_ends_type, "[100, 200, 300, 400, 500]"); + auto values = ArrayFromJSON(utf8(), R"(["Hello", "beautiful", "world", "of", "RLE"])"); + ASSERT_OK_AND_ASSIGN(auto rle_array, + RunLengthEncodedArray::Make(run_ends, values, 500)); + + ASSERT_EQ(rle_array->GetPhysicalLength(), 5); + ASSERT_EQ(rle_array->GetPhysicalOffset(), 0); + + auto slice = std::dynamic_pointer_cast(rle_array->Slice(199, 5)); + ASSERT_EQ(slice->GetPhysicalLength(), 2); + ASSERT_EQ(slice->GetPhysicalOffset(), 1); + + auto slice2 = + std::dynamic_pointer_cast(rle_array->Slice(199, 101)); + ASSERT_EQ(slice2->GetPhysicalLength(), 2); + ASSERT_EQ(slice2->GetPhysicalOffset(), 1); + + auto slice3 = + std::dynamic_pointer_cast(rle_array->Slice(400, 100)); + ASSERT_EQ(slice3->GetPhysicalLength(), 1); + ASSERT_EQ(slice3->GetPhysicalOffset(), 4); + + auto slice4 = + std::dynamic_pointer_cast(rle_array->Slice(0, 150)); + ASSERT_EQ(slice4->GetPhysicalLength(), 2); + ASSERT_EQ(slice4->GetPhysicalOffset(), 0); + + auto zero_length_at_end = + std::dynamic_pointer_cast(rle_array->Slice(500, 0)); + ASSERT_EQ(zero_length_at_end->GetPhysicalLength(), 0); + ASSERT_EQ(zero_length_at_end->GetPhysicalOffset(), 5); +} + +INSTANTIATE_TEST_SUITE_P(EncodedArrayTests, TestRunLengthEncodedArray, + ::testing::Values(int16(), int32(), int64())); +} // anonymous namespace + +} // namespace arrow diff --git a/cpp/src/arrow/array/builder_base.cc b/cpp/src/arrow/array/builder_base.cc index e9d5fb44ac1ef..cd59ed08af18f 100644 --- a/cpp/src/arrow/array/builder_base.cc +++ b/cpp/src/arrow/array/builder_base.cc @@ -279,7 +279,7 @@ struct AppendScalarImpl { return Status::NotImplemented("AppendScalar for type ", type); } - Status Convert() { return VisitTypeInline(*(*scalars_begin_)->type, this); } + Status Convert() { return VisitScalarTypeInline(*(*scalars_begin_)->type, this); } const std::shared_ptr* scalars_begin_; const std::shared_ptr* scalars_end_; diff --git a/cpp/src/arrow/array/concatenate.cc b/cpp/src/arrow/array/concatenate.cc index aab734284fa97..ebe0a93da991d 100644 --- a/cpp/src/arrow/array/concatenate.cc +++ b/cpp/src/arrow/array/concatenate.cc @@ -436,6 +436,10 @@ class ConcatenateImpl { return Status::OK(); } + Status Visit(const RunLengthEncodedType& type) { + return Status::NotImplemented("concatenation of ", type); + } + Status Visit(const ExtensionType& e) { // XXX can we just concatenate their storage? return Status::NotImplemented("concatenation of ", e); diff --git a/cpp/src/arrow/array/data.cc b/cpp/src/arrow/array/data.cc index 0cfa9fcd2e11a..01554591b57ca 100644 --- a/cpp/src/arrow/array/data.cc +++ b/cpp/src/arrow/array/data.cc @@ -195,6 +195,7 @@ int GetNumBuffers(const DataType& type) { case Type::NA: case Type::STRUCT: case Type::FIXED_SIZE_LIST: + case Type::RUN_LENGTH_ENCODED: return 1; case Type::BINARY: case Type::LARGE_BINARY: diff --git a/cpp/src/arrow/array/diff.cc b/cpp/src/arrow/array/diff.cc index 10802939a7302..179f2bb44365b 100644 --- a/cpp/src/arrow/array/diff.cc +++ b/cpp/src/arrow/array/diff.cc @@ -118,6 +118,10 @@ struct ValueComparatorVisitor { return Status::NotImplemented("dictionary type"); } + Status Visit(const RunLengthEncodedType&) { + return Status::NotImplemented("run-length encoded type"); + } + ValueComparator Create(const DataType& type) { DCHECK_OK(VisitTypeInline(type, this)); return out; @@ -382,6 +386,8 @@ Result> Diff(const Array& base, const Array& target return Diff(*base_storage, *target_storage, pool); } else if (base.type()->id() == Type::DICTIONARY) { return Status::NotImplemented("diffing arrays of type ", *base.type()); + } else if (base.type()->id() == Type::RUN_LENGTH_ENCODED) { + return Status::NotImplemented("diffing arrays of type ", *base.type()); } else { return QuadraticSpaceMyersDiff(base, target, pool).Diff(); } @@ -633,6 +639,10 @@ class MakeFormatterImpl { return Status::NotImplemented("formatting diffs between arrays of type ", t); } + Status Visit(const RunLengthEncodedType& t) { + return Status::NotImplemented("formatting diffs between arrays of type ", t); + } + template Formatter MakeTimeFormatter(const std::string& fmt_str) { return [fmt_str](const Array& array, int64_t index, std::ostream* os) { diff --git a/cpp/src/arrow/array/util.cc b/cpp/src/arrow/array/util.cc index c0cdcab730ca4..f3020f3039f69 100644 --- a/cpp/src/arrow/array/util.cc +++ b/cpp/src/arrow/array/util.cc @@ -237,6 +237,7 @@ class ArrayDataEndianSwapper { Status Visit(const FixedSizeBinaryType& type) { return Status::OK(); } Status Visit(const FixedSizeListType& type) { return Status::OK(); } Status Visit(const StructType& type) { return Status::OK(); } + Status Visit(const RunLengthEncodedType& type) { return Status::OK(); } Status Visit(const UnionType& type) { out_->buffers[1] = data_->buffers[1]; if (type.mode() == UnionMode::DENSE) { @@ -542,7 +543,7 @@ class RepeatedArrayFactory { : pool_(pool), scalar_(scalar), length_(length) {} Result> Create() { - RETURN_NOT_OK(VisitTypeInline(*scalar_.type, this)); + RETURN_NOT_OK(VisitScalarTypeInline(*scalar_.type, this)); return out_; } diff --git a/cpp/src/arrow/array/validate.cc b/cpp/src/arrow/array/validate.cc index 56470ac74b0c7..dbff12f89c6b6 100644 --- a/cpp/src/arrow/array/validate.cc +++ b/cpp/src/arrow/array/validate.cc @@ -413,6 +413,10 @@ struct ValidateArrayImpl { return Status::OK(); } + Status Visit(const RunLengthEncodedType& type) { + return Status::NotImplemented("validating RLE"); + } + Status Visit(const ExtensionType& type) { // Visit storage return ValidateWithType(*type.storage_type()); diff --git a/cpp/src/arrow/compare.cc b/cpp/src/arrow/compare.cc index baadd10cca98b..eebc0c3723d7a 100644 --- a/cpp/src/arrow/compare.cc +++ b/cpp/src/arrow/compare.cc @@ -387,6 +387,10 @@ class RangeDataEqualsImpl { return Status::OK(); } + Status Visit(const RunLengthEncodedType& type) { + return Status::NotImplemented("comparing run-length encoded data"); + } + Status Visit(const ExtensionType& type) { // Compare storages result_ &= CompareWithType(*type.storage_type()); @@ -666,6 +670,12 @@ class TypeEqualsVisitor { return Status::OK(); } + Status Visit(const RunLengthEncodedType& left) { + const auto& right = checked_cast(right_); + result_ = left.encoded_type()->Equals(right.encoded_type()); + return Status::OK(); + } + Status Visit(const ExtensionType& left) { result_ = left.ExtensionEquals(static_cast(right_)); return Status::OK(); diff --git a/cpp/src/arrow/engine/substrait/type_internal.cc b/cpp/src/arrow/engine/substrait/type_internal.cc index 16032df67db63..a41fcf02effda 100644 --- a/cpp/src/arrow/engine/substrait/type_internal.cc +++ b/cpp/src/arrow/engine/substrait/type_internal.cc @@ -326,6 +326,7 @@ struct DataTypeToProtoImpl { Status Visit(const SparseUnionType& t) { return NotImplemented(t); } Status Visit(const DenseUnionType& t) { return NotImplemented(t); } Status Visit(const DictionaryType& t) { return NotImplemented(t); } + Status Visit(const RunLengthEncodedType& t) { return NotImplemented(t); } Status Visit(const MapType& t) { // FIXME assert default field names; custom ones won't roundtrip diff --git a/cpp/src/arrow/ipc/metadata_internal.cc b/cpp/src/arrow/ipc/metadata_internal.cc index 2e450b9d46d8f..64d5105c464a6 100644 --- a/cpp/src/arrow/ipc/metadata_internal.cc +++ b/cpp/src/arrow/ipc/metadata_internal.cc @@ -690,6 +690,10 @@ class FieldToFlatbufferVisitor { return VisitType(*checked_cast(type).value_type()); } + Status Visit(const RunLengthEncodedType& type) { + return Status::NotImplemented("run-length encoded type in ipc"); + } + Status Visit(const ExtensionType& type) { RETURN_NOT_OK(VisitType(*type.storage_type())); extra_type_metadata_[kExtensionTypeKeyName] = type.extension_name(); diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index a1b17afaaf9f4..20409421c8c4f 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -421,6 +421,10 @@ class ArrayLoader { return LoadType(*type.index_type()); } + Status Visit(const RunLengthEncodedType& type) { + return Status::NotImplemented("run-length encoded array in ipc"); + } + Status Visit(const ExtensionType& type) { return LoadType(*type.storage_type()); } BatchDataReadRequest& read_request() { return read_request_; } diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index b89604e6fe1b3..b96fa1f213ec6 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -526,6 +526,10 @@ class RecordBatchSerializer { return VisitType(*array.indices()); } + Status Visit(const RunLengthEncodedArray& type) { + return Status::NotImplemented("run-length encoded array in ipc"); + } + Status Visit(const ExtensionArray& array) { return VisitType(*array.storage()); } Status VisitType(const Array& values) { return VisitArrayInline(values, this); } diff --git a/cpp/src/arrow/json/test_common.h b/cpp/src/arrow/json/test_common.h index c01036047cf72..f0d2d33278789 100644 --- a/cpp/src/arrow/json/test_common.h +++ b/cpp/src/arrow/json/test_common.h @@ -145,6 +145,8 @@ struct GenerateImpl { Status Visit(const UnionType& t) { return NotImplemented(t); } + Status Visit(const RunLengthEncodedType& t) { return NotImplemented(t); } + Status NotImplemented(const DataType& t) { return Status::NotImplemented("random generation of arrays of type ", t); } diff --git a/cpp/src/arrow/pretty_print.cc b/cpp/src/arrow/pretty_print.cc index 18f4ca6825206..256ca1c7dac58 100644 --- a/cpp/src/arrow/pretty_print.cc +++ b/cpp/src/arrow/pretty_print.cc @@ -377,6 +377,10 @@ class ArrayPrinter : public PrettyPrinter { return PrettyPrint(*array.indices(), ChildOptions(true), sink_); } + Status Visit(const RunLengthEncodedArray& array) { + return Status::NotImplemented("printing run-length encoded array"); + } + Status Print(const Array& array) { RETURN_NOT_OK(VisitArrayInline(array, this)); Flush(); diff --git a/cpp/src/arrow/scalar.cc b/cpp/src/arrow/scalar.cc index 0ca08d7a82e3e..f578e16325ff3 100644 --- a/cpp/src/arrow/scalar.cc +++ b/cpp/src/arrow/scalar.cc @@ -777,7 +777,7 @@ struct MakeNullImpl { std::shared_ptr Finish() && { // Should not fail. - DCHECK_OK(VisitTypeInline(*type_, this)); + DCHECK_OK(VisitScalarTypeInline(*type_, this)); return std::move(out_); } @@ -844,7 +844,7 @@ struct ScalarParseImpl { Status FinishWithBuffer() { return Finish(Buffer::FromString(std::string(s_))); } Result> Finish() && { - RETURN_NOT_OK(VisitTypeInline(*type_, this)); + RETURN_NOT_OK(VisitScalarTypeInline(*type_, this)); return std::move(out_); } @@ -1122,7 +1122,7 @@ struct ToTypeVisitor : CastImplVisitor { template Status Visit(const ToType&) { FromTypeVisitor unpack_from_type{from_, to_type_, out_}; - return VisitTypeInline(*from_.type, &unpack_from_type); + return VisitScalarTypeInline(*from_.type, &unpack_from_type); } Status Visit(const NullType&) { @@ -1149,7 +1149,7 @@ Result> Scalar::CastTo(std::shared_ptr to) con if (is_valid) { out->is_valid = true; ToTypeVisitor unpack_to_type{*this, to, out.get()}; - RETURN_NOT_OK(VisitTypeInline(*to, &unpack_to_type)); + RETURN_NOT_OK(VisitScalarTypeInline(*to, &unpack_to_type)); } return out; } diff --git a/cpp/src/arrow/testing/gtest_util.cc b/cpp/src/arrow/testing/gtest_util.cc index 37c430892d022..ebdcc47079f9c 100644 --- a/cpp/src/arrow/testing/gtest_util.cc +++ b/cpp/src/arrow/testing/gtest_util.cc @@ -64,6 +64,48 @@ namespace arrow { using internal::checked_cast; using internal::checked_pointer_cast; + +std::vector AllTypeIds() { + return {Type::NA, + Type::BOOL, + Type::INT8, + Type::INT16, + Type::INT32, + Type::INT64, + Type::UINT8, + Type::UINT16, + Type::UINT32, + Type::UINT64, + Type::HALF_FLOAT, + Type::FLOAT, + Type::DOUBLE, + Type::DECIMAL128, + Type::DECIMAL256, + Type::DATE32, + Type::DATE64, + Type::TIME32, + Type::TIME64, + Type::TIMESTAMP, + Type::INTERVAL_DAY_TIME, + Type::INTERVAL_MONTHS, + Type::DURATION, + Type::STRING, + Type::BINARY, + Type::LARGE_STRING, + Type::LARGE_BINARY, + Type::FIXED_SIZE_BINARY, + Type::STRUCT, + Type::LIST, + Type::LARGE_LIST, + Type::FIXED_SIZE_LIST, + Type::MAP, + Type::DENSE_UNION, + Type::SPARSE_UNION, + Type::DICTIONARY, + Type::RUN_LENGTH_ENCODED, + Type::EXTENSION, + Type::INTERVAL_MONTH_DAY_NANO}; +} using internal::ThreadPool; template diff --git a/cpp/src/arrow/testing/json_internal.cc b/cpp/src/arrow/testing/json_internal.cc index c1d45aa2e0880..2b131a384ded5 100644 --- a/cpp/src/arrow/testing/json_internal.cc +++ b/cpp/src/arrow/testing/json_internal.cc @@ -437,6 +437,10 @@ class SchemaWriter { Status Visit(const DictionaryType& type) { return VisitType(*type.value_type()); } + Status Visit(const RunLengthEncodedType& type) { + return Status::NotImplemented("run-length encoded type in JSON"); + } + Status Visit(const ExtensionType& type) { return Status::NotImplemented(type.name()); } private: @@ -743,6 +747,10 @@ class ArrayWriter { return WriteChildren(type.fields(), children); } + Status Visit(const RunLengthEncodedArray& type) { + return Status::NotImplemented("run-length encoded array in JSON"); + } + Status Visit(const ExtensionArray& array) { return VisitArrayValues(*array.storage()); } private: @@ -1542,6 +1550,10 @@ class ArrayReader { return Status::OK(); } + Status Visit(const RunLengthEncodedType& type) { + return Status::NotImplemented("run-length encoded array in JSON"); + } + Status Visit(const ExtensionType& type) { ArrayReader parser(obj_, pool_, field_->WithType(type.storage_type())); ARROW_ASSIGN_OR_RAISE(data_, parser.Parse()); diff --git a/cpp/src/arrow/type.cc b/cpp/src/arrow/type.cc index ea9525404c816..2610305c44d81 100644 --- a/cpp/src/arrow/type.cc +++ b/cpp/src/arrow/type.cc @@ -200,6 +200,7 @@ std::string ToString(Type::type id) { TO_STRING_CASE(DENSE_UNION) TO_STRING_CASE(SPARSE_UNION) TO_STRING_CASE(DICTIONARY) + TO_STRING_CASE(RUN_LENGTH_ENCODED) TO_STRING_CASE(EXTENSION) #undef TO_STRING_CASE @@ -757,6 +758,29 @@ Result> DenseUnionType::Make( return std::make_shared(fields, type_codes); } +// ---------------------------------------------------------------------- +// Run-length encoded type + +RunLengthEncodedType::RunLengthEncodedType(std::shared_ptr run_ends_type, + std::shared_ptr encoded_type) + : NestedType(Type::RUN_LENGTH_ENCODED), EncodingType(encoded_type) { + assert(RunEndsTypeValid(*run_ends_type)); + children_ = {std::make_shared("run_ends", run_ends_type, false), + std::make_shared("values", std::move(encoded_type), true)}; +} + +std::string RunLengthEncodedType::ToString() const { + std::stringstream s; + s << name() << "ToString() + << ", values: " << encoded_type()->ToString() << ">"; + return s.str(); +} + +bool RunLengthEncodedType::RunEndsTypeValid(const DataType& run_ends_type) { + return run_ends_type.id() == Type::INT16 || run_ends_type.id() == Type::INT32 || + run_ends_type.id() == Type::INT64; +} + // ---------------------------------------------------------------------- // Struct type @@ -2231,6 +2255,15 @@ std::string UnionType::ComputeFingerprint() const { return ss.str(); } +std::string RunLengthEncodedType::ComputeFingerprint() const { + std::stringstream ss; + ss << TypeIdFingerprint(*this) << "{"; + ss << run_ends_type()->fingerprint() << ";"; + ss << encoded_type()->fingerprint() << ";"; + ss << "}"; + return ss.str(); +} + std::string TimeType::ComputeFingerprint() const { std::stringstream ss; ss << TypeIdFingerprint(*this) << TimeUnitFingerprint(unit_); @@ -2367,6 +2400,13 @@ std::shared_ptr struct_(const std::vector>& fie return std::make_shared(fields); } +std::shared_ptr run_length_encoded( + std::shared_ptr run_ends_type, + std::shared_ptr encoded_type) { + return std::make_shared(std::move(run_ends_type), + std::move(encoded_type)); +} + std::shared_ptr sparse_union(FieldVector child_fields, std::vector type_codes) { if (type_codes.empty()) { diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h index 415aaacf1c9ef..244b5f60dfbc2 100644 --- a/cpp/src/arrow/type.h +++ b/cpp/src/arrow/type.h @@ -1196,6 +1196,48 @@ class ARROW_EXPORT DenseUnionType : public UnionType { std::string name() const override { return "dense_union"; } }; +/// \brief Type class class that can be subclassed by all encoding types, that allowes +/// users to check if arrays are compatible besides the encoding independent of which +/// exact encoding they use +class ARROW_EXPORT EncodingType { + public: + explicit EncodingType(std::shared_ptr encoded_type) + : encoded_type_{encoded_type} {} + + const std::shared_ptr& encoded_type() const { return encoded_type_; } + + private: + std::shared_ptr encoded_type_; +}; + +/// \brief Type class for run-length encoded data +class ARROW_EXPORT RunLengthEncodedType : public NestedType, public EncodingType { + public: + static constexpr Type::type type_id = Type::RUN_LENGTH_ENCODED; + + static constexpr const char* type_name() { return "run_length_encoded"; } + + explicit RunLengthEncodedType(std::shared_ptr run_ends_type, + std::shared_ptr encoded_type); + + DataTypeLayout layout() const override { + // always add one that is NULLPTR to make code, since existing code may assume there + // is at least one buffer + return DataTypeLayout({DataTypeLayout::AlwaysNull()}); + } + + const std::shared_ptr& run_ends_type() const { return fields()[0]->type(); } + + std::string ToString() const override; + + std::string name() const override { return "run_length_encoded"; } + + static bool RunEndsTypeValid(const DataType& run_ends_type); + + private: + std::string ComputeFingerprint() const override; +}; + /// @} // ---------------------------------------------------------------------- @@ -2103,6 +2145,7 @@ static inline bool HasValidityBitmap(Type::type id) { case Type::NA: case Type::DENSE_UNION: case Type::SPARSE_UNION: + case Type::RUN_LENGTH_ENCODED: return false; default: return true; diff --git a/cpp/src/arrow/type_fwd.h b/cpp/src/arrow/type_fwd.h index ba0e635f737b2..4aebc0b953986 100644 --- a/cpp/src/arrow/type_fwd.h +++ b/cpp/src/arrow/type_fwd.h @@ -179,6 +179,9 @@ class DenseUnionArray; class DenseUnionBuilder; struct DenseUnionScalar; +class RunLengthEncodedType; +class RunLengthEncodedArray; + template class NumericArray; @@ -405,6 +408,8 @@ struct Type { /// Calendar interval type with three fields. INTERVAL_MONTH_DAY_NANO, + RUN_LENGTH_ENCODED, + // Leave this at the end MAX_ID }; @@ -550,6 +555,10 @@ ARROW_EXPORT std::shared_ptr time64(TimeUnit::type unit); ARROW_EXPORT std::shared_ptr struct_( const std::vector>& fields); +/// \brief Create a RunLengthEncoded instance +ARROW_EXPORT std::shared_ptr run_length_encoded( + std::shared_ptr run_ends_type, std::shared_ptr encoded_type); + /// \brief Create a SparseUnionType instance ARROW_EXPORT std::shared_ptr sparse_union(FieldVector child_fields, std::vector type_codes = {}); diff --git a/cpp/src/arrow/type_test.cc b/cpp/src/arrow/type_test.cc index 954ad63c8aa68..b05622beaa8a6 100644 --- a/cpp/src/arrow/type_test.cc +++ b/cpp/src/arrow/type_test.cc @@ -1879,6 +1879,38 @@ TEST(TypesTest, TestDecimalEquals) { AssertTypeNotEqual(t5, t10); } +TEST(TypesTest, TestRunLengthEncodedType) { + auto int8_make_shared = std::make_shared(int32(), list(int8())); + auto int8_factory = run_length_encoded(int32(), list(int8())); + auto int32_factory = run_length_encoded(int32(), list(int32())); + + ASSERT_EQ(*int8_make_shared, *int8_factory); + ASSERT_NE(*int8_make_shared, *int32_factory); + + ASSERT_EQ(int8_factory->id(), Type::RUN_LENGTH_ENCODED); + ASSERT_EQ(int32_factory->id(), Type::RUN_LENGTH_ENCODED); + + auto int8_rle_type = std::dynamic_pointer_cast(int8_factory); + auto int32_rle_type = std::dynamic_pointer_cast(int32_factory); + + ASSERT_EQ(*int8_rle_type->encoded_type(), *list(int8())); + ASSERT_EQ(*int32_rle_type->encoded_type(), *list(int32())); + + ASSERT_TRUE(int8_rle_type->field(0)->Equals(Field("run_ends", int32(), false))); + ASSERT_TRUE(int8_rle_type->field(1)->Equals(Field("values", list(int8()), true))); + + auto int16_int32 = run_length_encoded(int16(), list(int32())); + auto int64_int32 = run_length_encoded(int64(), list(int32())); + ASSERT_NE(*int32_factory, *int16_int32); + ASSERT_NE(*int32_factory, *int64_int32); + ASSERT_NE(*int16_int32, *int64_int32); + + ASSERT_EQ(int8_factory->ToString(), + "run_length_encoded>"); + ASSERT_EQ(int16_int32->ToString(), + "run_length_encoded>"); +} + #define TEST_PREDICATE(all_types, type_predicate) \ for (auto type : all_types) { \ ASSERT_EQ(type_predicate(type->id()), type_predicate(*type)); \ diff --git a/cpp/src/arrow/type_traits.h b/cpp/src/arrow/type_traits.h index 587396906631b..6e930a8a1ea31 100644 --- a/cpp/src/arrow/type_traits.h +++ b/cpp/src/arrow/type_traits.h @@ -381,6 +381,14 @@ struct TypeTraits { static inline std::shared_ptr type_singleton() { return large_utf8(); } }; +template <> +struct TypeTraits { + using ArrayType = RunLengthEncodedArray; + // TODO: using BuilderType = RunLengthEncodedBuilder; + + constexpr static bool is_parameter_free = false; +}; + /// @} /// \addtogroup c-type-traits @@ -756,6 +764,18 @@ using is_interval_type = std::is_base_of; template using enable_if_interval = enable_if_t::value, R>; +template +using is_encoding_type = std::is_base_of; + +template +using enable_if_encoding = enable_if_t::value, R>; + +template +using is_run_length_encoded_type = std::is_base_of; + +template +using enable_if_run_length_encoded = enable_if_t::value, R>; + template using is_dictionary_type = std::is_base_of; @@ -1177,6 +1197,7 @@ constexpr bool is_nested(Type::type type_id) { case Type::STRUCT: case Type::SPARSE_UNION: case Type::DENSE_UNION: + case Type::RUN_LENGTH_ENCODED: return true; default: break; diff --git a/cpp/src/arrow/util/CMakeLists.txt b/cpp/src/arrow/util/CMakeLists.txt index 5141e30d0917a..ae05c4dc72129 100644 --- a/cpp/src/arrow/util/CMakeLists.txt +++ b/cpp/src/arrow/util/CMakeLists.txt @@ -57,6 +57,8 @@ add_arrow_test(utility-test queue_test.cc range_test.cc reflection_test.cc + rle_encoding_test.cc + rle_util_test.cc small_vector_test.cc stl_util_test.cc string_test.cc diff --git a/cpp/src/arrow/util/rle_util.cc b/cpp/src/arrow/util/rle_util.cc new file mode 100644 index 0000000000000..83a6878cc0bf3 --- /dev/null +++ b/cpp/src/arrow/util/rle_util.cc @@ -0,0 +1,69 @@ +#include "arrow/util/rle_util.h" +#include +#include "arrow/builder.h" + +namespace arrow { +namespace rle_util { + +template +int64_t FindPhysicalOffset(const RunEndsType* run_ends, int64_t num_run_ends, + int64_t logical_offset) { + auto it = std::upper_bound(run_ends, run_ends + num_run_ends, logical_offset); + int64_t result = std::distance(run_ends, it); + ARROW_DCHECK_LE(result, num_run_ends); + return result; +} + +void AddArtificialOffsetInChildArray(ArrayData* array, int64_t offset) { + auto& child = array->child_data[1]; + auto builder = MakeBuilder(child->type).ValueOrDie(); + ARROW_CHECK_OK(builder->AppendNulls(offset)); + ARROW_CHECK_OK(builder->AppendArraySlice(ArraySpan(*child), 0, child->length)); + array->child_data[1] = builder->Finish().ValueOrDie()->Slice(offset)->data(); +} + +int64_t GetPhysicalOffset(const ArraySpan& span) { + // TODO: caching + auto type_id = RunEndsArray(span).type->id(); + if (type_id == Type::INT16) { + return FindPhysicalOffset(RunEnds(span), RunEndsArray(span).length, + span.offset); + } else if (type_id == Type::INT32) { + return FindPhysicalOffset(RunEnds(span), RunEndsArray(span).length, + span.offset); + } else { + ARROW_CHECK(type_id == Type::INT64); + return FindPhysicalOffset(RunEnds(span), RunEndsArray(span).length, + span.offset); + } +} + +template +static int64_t GetPhysicalLengthInternal(const ArraySpan& span) { + // find the offset of the last element and add 1 + int64_t physical_offset = GetPhysicalOffset(span); + return FindPhysicalOffset(RunEnds(span) + physical_offset, + RunEndsArray(span).length - physical_offset, + span.offset + span.length - 1) + + 1; +} + +int64_t GetPhysicalLength(const ArraySpan& span) { + // TODO: caching + if (span.length == 0) { + return 0; + } else { + auto type_id = RunEndsArray(span).type->id(); + if (type_id == Type::INT16) { + return GetPhysicalLengthInternal(span); + } else if (type_id == Type::INT32) { + return GetPhysicalLengthInternal(span); + } else { + ARROW_CHECK_EQ(type_id, Type::INT64); + return GetPhysicalLengthInternal(span); + } + } +} + +} // namespace rle_util +} // namespace arrow diff --git a/cpp/src/arrow/util/rle_util.h b/cpp/src/arrow/util/rle_util.h new file mode 100644 index 0000000000000..2976e5ce9769e --- /dev/null +++ b/cpp/src/arrow/util/rle_util.h @@ -0,0 +1,228 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "arrow/util/logging.h" +#include "arrow/util/macros.h" + +#include "arrow/array/data.h" + +namespace arrow { +namespace rle_util { + +/// \brief Get the physical offset from a logical offset given run end values using binary +/// search. Returns num_run_ends if the physical offset is not within the first +/// num_run_ends elements. +template +int64_t FindPhysicalOffset(const RunEndsType* run_ends, int64_t num_run_ends, + int64_t logical_offset); + +/// \brief Get the physical offset of an RLE ArraySpan. Warning: calling this may result +/// in in an O(log(N)) binary search on the run ends buffer +int64_t GetPhysicalOffset(const ArraySpan& span); + +/// \brief Get the physical length of an RLE ArraySpan. Avoid calling this method in a +/// context where you can easily calculate the value yourself. Calling this can result in +/// an O(log(N)) binary search on the run ends buffer +int64_t GetPhysicalLength(const ArraySpan& span); + +/// \brief Get the child array holding the data values from an RLE array +static inline const ArraySpan& RunEndsArray(const ArraySpan& span) { + return span.child_data[0]; +} + +/// \brief Get a pointer to run ends values of an RLE array +template +static inline const RunEndsType* RunEnds(const ArraySpan& span) { + return RunEndsArray(span).GetValues(1); +} + +/// \brief Get the child array holding the data values from an RLE array +static inline const ArraySpan& ValuesArray(const ArraySpan& span) { + return span.child_data[1]; +} + +/// \brief Iterate over two run-length encoded arrays in segments of runs that are inside +/// run boundaries in each input +template +class MergedRunsIterator { + public: + static constexpr size_t NUM_INPUTS = sizeof...(RunEndsTypes); + template + explicit MergedRunsIterator(InputTypes&... array_spans) : inputs(array_spans...) { + static_assert(sizeof...(InputTypes) == sizeof...(RunEndsTypes), + "number of run ends types and input ArraySpans must be the same"); + if constexpr (NUM_INPUTS == 0) { + // end interator + logical_length_ = 0; + } else { + logical_length_ = FindCommonLength(); + if (!isEnd()) { + FindMergedRun(); + } + } + } + + /*explicit MergedRunsIterator(ArraySpan array_span) : inputs(Input(array_span)) + { + //static_assert(sizeof...(InputTypes) == sizeof...(RunEndsTypes), "number of run ends + types and input ArraySpans must be the same"); if constexpr (NUM_INPUTS == 0) { + // end interator + logical_length_ = 0; + } else { + logical_length_ = FindCommonLength(); + if (!isEnd()) { + FindMergedRun(); + } + } + } + explicit MergedRunsIterator(ArraySpan array_span_a, ArraySpan array_span_b) : + inputs(Input(array_span_a), Input(array_span_b)) { + //static_assert(sizeof...(InputTypes) == sizeof...(RunEndsTypes), "number of run ends + types and input ArraySpans must be the same"); if constexpr (NUM_INPUTS == 0) { + // end interator + logical_length_ = 0; + } else { + logical_length_ = FindCommonLength(); + if (!isEnd()) { + FindMergedRun(); + } + } + }*/ + + MergedRunsIterator(const MergedRunsIterator& other) = default; + + MergedRunsIterator& operator++() { + logical_position_ = merged_run_end_; + IncrementInputs(); + if (!isEnd()) { + FindMergedRun(); + } + return *this; + } + + MergedRunsIterator& operator++(int) { + MergedRunsIterator& result = *this; + ++(*this); + return result; + } + + template + bool operator==(const MergedRunsIterator& other) const { + return (isEnd() && other.isEnd()) || + (!isEnd() && !other.isEnd() && logical_position_ == other.logical_position()); + } + + template + bool operator!=(const MergedRunsIterator& other) const { + return !(*this == other); + } + + /// \brief returns a physical index into the values array buffers of a given input, + /// pointing to the value of the current run. The index includes the array offset, so it + /// can be used to access a buffer directly + template + int64_t index_into_buffer() const { + auto& input = std::get(inputs); + return input.run_index + ValuesArray(input.array_span).offset; + } + /// \brief returns a physical index into the values array of a given input, pointing to + /// the value of the current run + template + int64_t index_into_array() const { + return std::get(inputs).run_index; + } + /// \brief returns the logical length of the current run + int64_t run_length() const { return merged_run_end_ - logical_position_; } + /// \brief returns the accumulated length of all runs from the beginning of the array + /// including the current one + int64_t accumulated_run_length() const { return merged_run_end_; } + + bool isEnd() const { return logical_position_ == logical_length_; } + int64_t logical_position() const { return logical_position_; } + + private: + template + struct Input { + Input(const ArraySpan& array_span) : array_span{array_span} { + run_ends = RunEnds(array_span); + run_index = rle_util::FindPhysicalOffset(run_ends, RunEndsArray(array_span).length, + array_span.offset); + // actual value found later by FindMergedRun: + current_run_end = 0; + } + + const ArraySpan& array_span; + const RunEndsType* run_ends; + int64_t run_index; + int64_t current_run_end; + }; + + template + void FindMergedRun() { + if constexpr (input_id == 0) { + merged_run_end_ = std::numeric_limits::max(); + } + auto& input = std::get(inputs); + // logical indices of the end of the run we are currently in each input + input.current_run_end = input.run_ends[input.run_index] - input.array_span.offset; + // the logical length may end in the middle of a run, in case the array was sliced + input.current_run_end = std::min(input.current_run_end, logical_length_); + ARROW_DCHECK_GT(input.current_run_end, logical_position_); + merged_run_end_ = std::min(merged_run_end_, input.current_run_end); + if constexpr (input_id < NUM_INPUTS - 1) { + FindMergedRun(); + } + } + + template + int64_t FindCommonLength() { + int64_t our_length = std::get(inputs).array_span.length; + if constexpr (input_id < NUM_INPUTS - 1) { + int64_t other_length = FindCommonLength(); + ARROW_CHECK_EQ(our_length, other_length) + << "MergedRunsIteratror can only be used on arrays of the same length"; + } + return our_length; + } + + template + void IncrementInputs() { + auto& input = std::get(inputs); + if (logical_position_ == input.current_run_end) { + input.run_index++; + } + if constexpr (input_id < NUM_INPUTS - 1) { + IncrementInputs(); + } + } + + std::tuple...> inputs; + int64_t logical_position_ = 0; + int64_t logical_length_ = 0; + int64_t merged_run_end_; +}; + +// TODO: this may fit better into some testing header +void AddArtificialOffsetInChildArray(ArrayData* array, int64_t offset); + +} // namespace rle_util +} // namespace arrow diff --git a/cpp/src/arrow/util/rle_util_test.cc b/cpp/src/arrow/util/rle_util_test.cc new file mode 100644 index 0000000000000..b20bd8b89cea5 --- /dev/null +++ b/cpp/src/arrow/util/rle_util_test.cc @@ -0,0 +1,222 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include "arrow/array.h" +#include "arrow/builder.h" +#include "arrow/compute/api_vector.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/type_traits.h" +#include "arrow/util/rle_util.h" + +namespace arrow { +namespace rle_util { + +template +struct RleUtilTest : public ::testing::Test {}; +TYPED_TEST_SUITE_P(RleUtilTest); + +TYPED_TEST_P(RleUtilTest, PhysicalOffset) { + ASSERT_EQ(FindPhysicalOffset((const TypeParam[]){1}, 1, 0), 0); + ASSERT_EQ(FindPhysicalOffset((const TypeParam[]){1, 2, 3}, 3, 0), 0); + ASSERT_EQ(FindPhysicalOffset((const TypeParam[]){1, 2, 3}, 3, 1), 1); + ASSERT_EQ(FindPhysicalOffset((const TypeParam[]){1, 2, 3}, 3, 2), 2); + ASSERT_EQ(FindPhysicalOffset((const TypeParam[]){2, 3, 4}, 3, 0), 0); + ASSERT_EQ(FindPhysicalOffset((const TypeParam[]){2, 3, 4}, 3, 1), 0); + ASSERT_EQ(FindPhysicalOffset((const TypeParam[]){2, 3, 4}, 3, 2), 1); + ASSERT_EQ(FindPhysicalOffset((const TypeParam[]){2, 3, 4}, 3, 3), 2); + ASSERT_EQ(FindPhysicalOffset((const TypeParam[]){2, 4, 6}, 3, 3), 1); + ASSERT_EQ(FindPhysicalOffset((const TypeParam[]){1, 2, 3, 4, 5, 6, 7, 8, 9, 1000, 1005, + 1015, 1020, 1025, 1050}, + 15, 1000), + 10); + + // out-of-range logical offset should return num_run_ends + ASSERT_EQ(FindPhysicalOffset((const TypeParam[]){2, 4, 6}, 3, 6), 3); + ASSERT_EQ(FindPhysicalOffset((const TypeParam[]){2, 4, 6}, 3, 10000), 3); + ASSERT_EQ(FindPhysicalOffset((const TypeParam[]){2, 4, 6}, 0, 5), 0); +} + +TYPED_TEST_P(RleUtilTest, ArtificalOffset) { + const std::shared_ptr run_ends_type = + std::make_shared::ArrowType>(); + + const auto values = ArrayFromJSON(int32(), "[1, 2, 3]"); + const auto run_ends = ArrayFromJSON(run_ends_type, "[10, 20, 30]"); + ASSERT_OK_AND_ASSIGN(auto array, RunLengthEncodedArray::Make(run_ends, values, 30)); + AddArtificialOffsetInChildArray(array->data().get(), 100); + ASSERT_ARRAYS_EQUAL(*values, *array->values_array()); + ASSERT_EQ(array->values_array()->offset(), 100); +} + +TYPED_TEST_P(RleUtilTest, MergedRunsInterator) { + /* Construct the following two test arrays with a lot of different offsets to test the + * RLE iterator: left: + * + * child offset: 0 + * | + * +---+---+---+---+---+---+---+---+---+----+----+----+----+----+-----+ + * run_ends | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 |1000|1005|1015|1020|1025|30000| + * (Int32Array) +---+---+---+---+---+---+---+---+---+----+----+----+----+----+-----+ + * ---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+-----+ + * values ... | | | | | | | | | | | | | | | | + * (NullArray) ---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+-----+ + * |<--------------- slice of NullArray------------------------------>| + * | | logical length: 50 | + * child offset: 100 |<-------------------->| + * | physical length: 5 | + * | | + * logical offset: 1000 + * physical offset: 10 + * + * right: + * child offset: 0 + * | + * +---+---+---+---+---+------+------+------+------+ + * run_ends | 1 | 2 | 3 | 4 | 5 | 2005 | 2009 | 2025 | 2050 | + * (Int32Array) +---+---+---+---+---+------+------+------+------+ + * ---+---+---+---+---+---+------+------+------+------+ + * values ... | | | | | | | | | | + * (NullArray) ---+---+---+---+---+---+------+------+------+------+ + * |<-------- slice of NullArray------------------>| + * | | logical length: 50 | + * child offset: 200 |<-------------------->| + * | physical length: 4 + * | + * logical offset: 2000 + * physical offset: 5 + */ + const std::shared_ptr run_ends_type = + std::make_shared::ArrowType>(); + + const auto left_run_ends = ArrayFromJSON( + run_ends_type, "[1, 2, 3, 4, 5, 6, 7, 8, 9, 1000, 1005, 1015, 1020, 1025, 30000]"); + const auto right_run_ends = + ArrayFromJSON(run_ends_type, "[1, 2, 3, 4, 5, 2005, 2009, 2025, 2050]"); + const std::vector expected_run_lengths = {5, 4, 6, 5, 5, 25}; + const std::vector expected_left_visits = {110, 111, 111, 112, 113, 114}; + const std::vector expected_right_visits = {205, 206, 207, 207, 207, 208}; + const int32_t left_parent_offset = 1000; + const int32_t left_child_offset = 100; + const int32_t right_parent_offset = 2000; + const int32_t right_child_offset = 200; + + std::shared_ptr left_child = + std::make_shared(left_child_offset + left_run_ends->length()); + std::shared_ptr right_child = + std::make_shared(right_child_offset + right_run_ends->length()); + + left_child = left_child->Slice(left_child_offset); + right_child = right_child->Slice(right_child_offset); + + ASSERT_OK_AND_ASSIGN(std::shared_ptr left_array, + RunLengthEncodedArray::Make(left_run_ends, left_child, 1050)); + ASSERT_OK_AND_ASSIGN(std::shared_ptr right_array, + RunLengthEncodedArray::Make(right_run_ends, right_child, 2050)); + left_array = left_array->Slice(left_parent_offset); + right_array = right_array->Slice(right_parent_offset); + ArraySpan left_span(*left_array->data()); + ArraySpan right_span(*right_array->data()); + + size_t position = 0; + size_t logical_position = 0; + for (auto it = MergedRunsIterator(left_span, right_span); + it != MergedRunsIterator(); ++it) { + ASSERT_EQ(it.run_length(), expected_run_lengths[position]); + ASSERT_EQ(it.template index_into_buffer<0>(), expected_left_visits[position]); + ASSERT_EQ(it.template index_into_buffer<1>(), expected_right_visits[position]); + ASSERT_EQ(it.template index_into_array<0>(), + expected_left_visits[position] - left_child_offset); + ASSERT_EQ(it.template index_into_array<1>(), + expected_right_visits[position] - right_child_offset); + position++; + logical_position += it.run_length(); + ASSERT_EQ(it.accumulated_run_length(), logical_position); + } + ASSERT_EQ(position, expected_run_lengths.size()); + + // left array only + const std::vector left_only_run_lengths = {5, 10, 5, 5, 25}; + + position = 0; + logical_position = 0; + for (auto it = MergedRunsIterator(left_span, left_span); + it != MergedRunsIterator(); ++it) { + ASSERT_EQ(it.run_length(), left_only_run_lengths[position]); + ASSERT_EQ(it.template index_into_buffer<0>(), 110 + position); + ASSERT_EQ(it.template index_into_buffer<1>(), 110 + position); + ASSERT_EQ(it.template index_into_array<0>(), 10 + position); + ASSERT_EQ(it.template index_into_array<1>(), 10 + position); + position++; + logical_position += it.run_length(); + ASSERT_EQ(it.accumulated_run_length(), logical_position); + } + ASSERT_EQ(position, left_only_run_lengths.size()); + + position = 0; + logical_position = 0; + for (auto it = MergedRunsIterator(left_span); it != MergedRunsIterator(); + ++it) { + ASSERT_EQ(it.run_length(), left_only_run_lengths[position]); + ASSERT_EQ(it.template index_into_buffer<0>(), 110 + position); + ASSERT_EQ(it.template index_into_array<0>(), 10 + position); + position++; + logical_position += it.run_length(); + ASSERT_EQ(it.accumulated_run_length(), logical_position); + } + ASSERT_EQ(position, left_only_run_lengths.size()); + + // right array only + const std::vector right_only_run_lengths = {5, 4, 16, 25}; + + position = 0; + logical_position = 0; + for (auto it = MergedRunsIterator(right_span, right_span); + it != MergedRunsIterator(); ++it) { + ASSERT_EQ(it.run_length(), right_only_run_lengths[position]); + ASSERT_EQ(it.template index_into_buffer<0>(), 205 + position); + ASSERT_EQ(it.template index_into_buffer<1>(), 205 + position); + ASSERT_EQ(it.template index_into_array<0>(), 5 + position); + ASSERT_EQ(it.template index_into_array<1>(), 5 + position); + position++; + logical_position += it.run_length(); + ASSERT_EQ(it.accumulated_run_length(), logical_position); + } + ASSERT_EQ(position, right_only_run_lengths.size()); + + position = 0; + logical_position = 0; + for (auto it = MergedRunsIterator(right_span); it != MergedRunsIterator(); + ++it) { + ASSERT_EQ(it.run_length(), right_only_run_lengths[position]); + ASSERT_EQ(it.template index_into_buffer<0>(), 205 + position); + ASSERT_EQ(it.template index_into_array<0>(), 5 + position); + position++; + logical_position += it.run_length(); + ASSERT_EQ(it.accumulated_run_length(), logical_position); + } + ASSERT_EQ(position, right_only_run_lengths.size()); +} + +REGISTER_TYPED_TEST_SUITE_P(RleUtilTest, PhysicalOffset, ArtificalOffset, + MergedRunsInterator); +using RunEndsTypes = testing::Types; +INSTANTIATE_TYPED_TEST_SUITE_P(RleUtilTestt, RleUtilTest, RunEndsTypes); + +} // namespace rle_util +} // namespace arrow diff --git a/cpp/src/arrow/visit_scalar_inline.h b/cpp/src/arrow/visit_scalar_inline.h index f3e8108e9c362..16fc27e2b295b 100644 --- a/cpp/src/arrow/visit_scalar_inline.h +++ b/cpp/src/arrow/visit_scalar_inline.h @@ -52,7 +52,7 @@ namespace arrow { template inline Status VisitScalarInline(const Scalar& scalar, VISITOR* visitor) { switch (scalar.type->id()) { - ARROW_GENERATE_FOR_ALL_TYPES(SCALAR_VISIT_INLINE); + ARROW_GENERATE_FOR_ALL_SCALAR_TYPES(SCALAR_VISIT_INLINE); default: break; } diff --git a/cpp/src/arrow/visit_type_inline.h b/cpp/src/arrow/visit_type_inline.h index 333ceaea1b7dc..a06191f6a6748 100644 --- a/cpp/src/arrow/visit_type_inline.h +++ b/cpp/src/arrow/visit_type_inline.h @@ -55,6 +55,36 @@ inline Status VisitTypeInline(const DataType& type, VISITOR* visitor) { return Status::NotImplemented("Type not implemented"); } +/// \brief Calls `visitor` with the corresponding concrete type class. This function does +/// the same as VisitTypeInline, except that it excludes types that can't exist as a +/// Scalar. +/// +/// \tparam VISITOR Visitor type that implements Visit() for Arrow types that can exist +/// as a Scalar. +/// \return Status +/// +/// A visitor is a type that implements specialized logic for each Arrow type. +/// Example usage: +/// +/// ``` +/// class ExampleVisitor { +/// arrow::Status Visit(const arrow::Int32Type& type) { ... } +/// arrow::Status Visit(const arrow::Int64Type& type) { ... } +/// ... +/// } +/// ExampleVisitor visitor; +/// VisitTypeInline(some_type, &visitor); +/// ``` +template +inline Status VisitScalarTypeInline(const DataType& type, VISITOR* visitor) { + switch (type.id()) { + ARROW_GENERATE_FOR_ALL_SCALAR_TYPES(TYPE_VISIT_INLINE); + default: + break; + } + return Status::NotImplemented("Type not implemented"); +} + #undef TYPE_VISIT_INLINE #define TYPE_VISIT_INLINE(TYPE_CLASS) \ diff --git a/cpp/src/arrow/visitor.cc b/cpp/src/arrow/visitor.cc index d22efc942ed9a..550a9b4aba788 100644 --- a/cpp/src/arrow/visitor.cc +++ b/cpp/src/arrow/visitor.cc @@ -69,6 +69,7 @@ ARRAY_VISITOR_DEFAULT(DenseUnionArray) ARRAY_VISITOR_DEFAULT(DictionaryArray) ARRAY_VISITOR_DEFAULT(Decimal128Array) ARRAY_VISITOR_DEFAULT(Decimal256Array) +ARRAY_VISITOR_DEFAULT(RunLengthEncodedArray) ARRAY_VISITOR_DEFAULT(ExtensionArray) #undef ARRAY_VISITOR_DEFAULT @@ -118,6 +119,7 @@ TYPE_VISITOR_DEFAULT(StructType) TYPE_VISITOR_DEFAULT(SparseUnionType) TYPE_VISITOR_DEFAULT(DenseUnionType) TYPE_VISITOR_DEFAULT(DictionaryType) +TYPE_VISITOR_DEFAULT(RunLengthEncodedType) TYPE_VISITOR_DEFAULT(ExtensionType) #undef TYPE_VISITOR_DEFAULT diff --git a/cpp/src/arrow/visitor.h b/cpp/src/arrow/visitor.h index 7f83c9ebab025..c7ca714b0a080 100644 --- a/cpp/src/arrow/visitor.h +++ b/cpp/src/arrow/visitor.h @@ -68,6 +68,7 @@ class ARROW_EXPORT ArrayVisitor { virtual Status Visit(const SparseUnionArray& array); virtual Status Visit(const DenseUnionArray& array); virtual Status Visit(const DictionaryArray& array); + virtual Status Visit(const RunLengthEncodedArray& array); virtual Status Visit(const ExtensionArray& array); }; @@ -116,6 +117,7 @@ class ARROW_EXPORT TypeVisitor { virtual Status Visit(const SparseUnionType& type); virtual Status Visit(const DenseUnionType& type); virtual Status Visit(const DictionaryType& type); + virtual Status Visit(const RunLengthEncodedType& type); virtual Status Visit(const ExtensionType& type); }; diff --git a/cpp/src/arrow/visitor_generate.h b/cpp/src/arrow/visitor_generate.h index 265c76197a4d4..5242ad4160a46 100644 --- a/cpp/src/arrow/visitor_generate.h +++ b/cpp/src/arrow/visitor_generate.h @@ -35,34 +35,39 @@ namespace arrow { ACTION(Float); \ ACTION(Double) -#define ARROW_GENERATE_FOR_ALL_TYPES(ACTION) \ - ACTION(Null); \ - ACTION(Boolean); \ - ARROW_GENERATE_FOR_ALL_NUMERIC_TYPES(ACTION); \ - ACTION(String); \ - ACTION(Binary); \ - ACTION(LargeString); \ - ACTION(LargeBinary); \ - ACTION(FixedSizeBinary); \ - ACTION(Duration); \ - ACTION(Date32); \ - ACTION(Date64); \ - ACTION(Timestamp); \ - ACTION(Time32); \ - ACTION(Time64); \ - ACTION(MonthDayNanoInterval); \ - ACTION(MonthInterval); \ - ACTION(DayTimeInterval); \ - ACTION(Decimal128); \ - ACTION(Decimal256); \ - ACTION(List); \ - ACTION(LargeList); \ - ACTION(Map); \ - ACTION(FixedSizeList); \ - ACTION(Struct); \ - ACTION(SparseUnion); \ - ACTION(DenseUnion); \ - ACTION(Dictionary); \ +// all types that can exist as a Scalar +#define ARROW_GENERATE_FOR_ALL_SCALAR_TYPES(ACTION) \ + ACTION(Null); \ + ACTION(Boolean); \ + ARROW_GENERATE_FOR_ALL_NUMERIC_TYPES(ACTION); \ + ACTION(String); \ + ACTION(Binary); \ + ACTION(LargeString); \ + ACTION(LargeBinary); \ + ACTION(FixedSizeBinary); \ + ACTION(Duration); \ + ACTION(Date32); \ + ACTION(Date64); \ + ACTION(Timestamp); \ + ACTION(Time32); \ + ACTION(Time64); \ + ACTION(MonthDayNanoInterval); \ + ACTION(MonthInterval); \ + ACTION(DayTimeInterval); \ + ACTION(Decimal128); \ + ACTION(Decimal256); \ + ACTION(List); \ + ACTION(LargeList); \ + ACTION(Map); \ + ACTION(FixedSizeList); \ + ACTION(Struct); \ + ACTION(SparseUnion); \ + ACTION(DenseUnion); \ + ACTION(Dictionary); \ ACTION(Extension) +#define ARROW_GENERATE_FOR_ALL_TYPES(ACTION) \ + ARROW_GENERATE_FOR_ALL_SCALAR_TYPES(ACTION); \ + ACTION(RunLengthEncoded) + } // namespace arrow diff --git a/cpp/src/parquet/arrow/path_internal.cc b/cpp/src/parquet/arrow/path_internal.cc index f176f66e13122..567bce0d142e3 100644 --- a/cpp/src/parquet/arrow/path_internal.cc +++ b/cpp/src/parquet/arrow/path_internal.cc @@ -801,6 +801,10 @@ class PathBuilder { return Status::OK(); } + Status Visit(const ::arrow::RunLengthEncodedArray& array) { + return Status::NotImplemented("arrow rle array in Parquet"); + } + Status Visit(const ::arrow::FixedSizeListArray& array) { MaybeAddNullable(array); int32_t list_size = array.list_type()->list_size(); diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index f7898c02d479d..fa71d7dbfb546 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -128,6 +128,7 @@ struct ValueBufferSlicer { NOT_IMPLEMENTED_VISIT(Struct); NOT_IMPLEMENTED_VISIT(FixedSizeList); NOT_IMPLEMENTED_VISIT(Dictionary); + NOT_IMPLEMENTED_VISIT(RunLengthEncoded); NOT_IMPLEMENTED_VISIT(Extension); #undef NOT_IMPLEMENTED_VISIT diff --git a/python/pyarrow/src/arrow/python/arrow_to_pandas.cc b/python/pyarrow/src/arrow/python/arrow_to_pandas.cc index f3cee6c65e781..e1233093ab377 100644 --- a/python/pyarrow/src/arrow/python/arrow_to_pandas.cc +++ b/python/pyarrow/src/arrow/python/arrow_to_pandas.cc @@ -1220,6 +1220,7 @@ struct ObjectWriterVisitor { enable_if_t::value || std::is_same::value || std::is_same::value || + std::is_same::value || std::is_same::value || (std::is_base_of::value && !std::is_same::value) ||