Skip to content

Commit

Permalink
apacheGH-32338: [C++] Add IPC support for Run-End Encoded Arrays (apa…
Browse files Browse the repository at this point in the history
…che#34550)

 * Closes apache#14340
 * Closes apache#32773
 * Closes apache#14179
 
* Closes: apache#32338

Lead-authored-by: Felipe Oliveira Carvalho <[email protected]>
Co-authored-by: Tobias Zagorni <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
  • Loading branch information
2 people authored and rtpsw committed Mar 27, 2023
1 parent 2ff376f commit 77ab504
Show file tree
Hide file tree
Showing 15 changed files with 348 additions and 14 deletions.
78 changes: 78 additions & 0 deletions cpp/src/arrow/array/array_run_end.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

#include "arrow/array/array_run_end.h"
#include "arrow/array/builder_primitive.h"
#include "arrow/array/util.h"
#include "arrow/util/logging.h"
#include "arrow/util/ree_util.h"
Expand Down Expand Up @@ -85,6 +86,83 @@ void RunEndEncodedArray::SetData(const std::shared_ptr<ArrayData>& data) {
values_array_ = MakeArray(this->data()->child_data[1]);
}

namespace {

template <typename RunEndType>
Result<std::shared_ptr<Array>> MakeLogicalRunEnds(const RunEndEncodedArray& self,
MemoryPool* pool) {
using RunEndCType = typename RunEndType::c_type;
if (self.offset() == 0) {
const auto& run_ends = *self.run_ends();
if (self.length() == 0) {
return run_ends.Slice(0, 0);
}

// If offset==0 and the non-zero logical length aligns perfectly with a
// physical run-end, we can return a slice of the run-ends array.
const int64_t physical_length = self.FindPhysicalLength();
const auto* run_end_values = self.data()->child_data[0]->GetValues<RunEndCType>(1);
if (run_end_values[physical_length - 1] == self.length()) {
return run_ends.Slice(0, physical_length);
}

// Otherwise we need to copy the run-ends array and adjust only the very
// last run-end.
auto new_run_ends_data = ArrayData::Make(run_ends.type(), physical_length, 0, 0);
{
ARROW_ASSIGN_OR_RAISE(auto buffer,
AllocateBuffer(physical_length * sizeof(RunEndCType), pool));
new_run_ends_data->buffers = {NULLPTR, std::move(buffer)};
}
auto* new_run_end_values = new_run_ends_data->GetMutableValues<RunEndCType>(1);
memcpy(new_run_end_values, run_end_values,
(physical_length - 1) * sizeof(RunEndCType));
new_run_end_values[physical_length - 1] = static_cast<RunEndCType>(self.length());
return MakeArray(std::move(new_run_ends_data));
}

// When the logical offset is non-zero, all run-end values need to be adjusted.
int64_t physical_offset = self.FindPhysicalOffset();
int64_t physical_length = self.FindPhysicalLength();

const auto* run_end_values = self.data()->child_data[0]->GetValues<RunEndCType>(1);
NumericBuilder<RunEndType> builder(pool);
RETURN_NOT_OK(builder.Resize(physical_length));
if (physical_length > 0) {
for (int64_t i = 0; i < physical_length - 1; i++) {
const auto run_end = run_end_values[physical_offset + i] - self.offset();
DCHECK_LT(run_end, self.length());
RETURN_NOT_OK(builder.Append(static_cast<RunEndCType>(run_end)));
}
DCHECK_GE(run_end_values[physical_offset + physical_length - 1] - self.offset(),
self.length());
RETURN_NOT_OK(builder.Append(static_cast<RunEndCType>(self.length())));
}
return builder.Finish();
}

} // namespace

Result<std::shared_ptr<Array>> RunEndEncodedArray::LogicalRunEnds(
MemoryPool* pool) const {
DCHECK(data()->child_data[0]->buffers[1]->is_cpu());
switch (run_ends_array_->type_id()) {
case Type::INT16:
return MakeLogicalRunEnds<Int16Type>(*this, pool);
case Type::INT32:
return MakeLogicalRunEnds<Int32Type>(*this, pool);
default:
DCHECK_EQ(run_ends_array_->type_id(), Type::INT64);
return MakeLogicalRunEnds<Int64Type>(*this, pool);
}
}

std::shared_ptr<Array> RunEndEncodedArray::LogicalValues() const {
const int64_t physical_offset = FindPhysicalOffset();
const int64_t physical_length = FindPhysicalLength();
return MakeArray(data()->child_data[1]->Slice(physical_offset, physical_length));
}

int64_t RunEndEncodedArray::FindPhysicalOffset() const {
const ArraySpan span(*this->data_);
return ree_util::FindPhysicalIndex(span, 0, span.offset);
Expand Down
13 changes: 13 additions & 0 deletions cpp/src/arrow/array/array_run_end.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,19 @@ class ARROW_EXPORT RunEndEncodedArray : public Array {
/// The physical offset to the array is applied.
const std::shared_ptr<Array>& values() const { return values_array_; }

/// \brief Returns an array holding the logical indexes of each run end
///
/// If a non-zero logical offset is set, this function allocates a new
/// array and rewrites all the run end values to be relative to the logical
/// offset and cuts the end of the array to the logical length.
Result<std::shared_ptr<Array>> LogicalRunEnds(MemoryPool* pool) const;

/// \brief Returns an array holding the values of each run
///
/// If a non-zero logical offset is set, this function allocates a new
/// array containing only the values within the logical range.
std::shared_ptr<Array> LogicalValues() const;

/// \brief Find the physical offset of this REE array
///
/// This function uses binary-search, so it has a O(log N) cost.
Expand Down
49 changes: 49 additions & 0 deletions cpp/src/arrow/array/array_run_end_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,55 @@ TEST_P(TestRunEndEncodedArray, FindOffsetAndLength) {
ASSERT_EQ(zero_length_at_end->FindPhysicalLength(), 0);
}

TEST_P(TestRunEndEncodedArray, LogicalRunEnds) {
auto run_ends = ArrayFromJSON(run_end_type, "[100, 200, 300, 400, 500]");
auto values = ArrayFromJSON(utf8(), R"(["Hello", "beautiful", "world", "of", "REE"])");
ASSERT_OK_AND_ASSIGN(auto ree_array, RunEndEncodedArray::Make(500, run_ends, values));

auto* pool = default_memory_pool();
ASSERT_OK_AND_ASSIGN(auto logical_run_ends, ree_array->LogicalRunEnds(pool));
ASSERT_ARRAYS_EQUAL(*logical_run_ends, *run_ends);

// offset=0, length=0
auto slice = ree_array->Slice(0, 0);
auto ree_slice = checked_cast<const RunEndEncodedArray*>(slice.get());
ASSERT_OK_AND_ASSIGN(logical_run_ends, ree_slice->LogicalRunEnds(pool));
ASSERT_ARRAYS_EQUAL(*logical_run_ends, *ArrayFromJSON(run_end_type, "[]"));

// offset=0, length=<a run-end>
for (int i = 1; i < 5; i++) {
auto expected_run_ends = run_ends->Slice(0, i);
slice = ree_array->Slice(0, i * 100);
ree_slice = checked_cast<const RunEndEncodedArray*>(slice.get());
ASSERT_OK_AND_ASSIGN(logical_run_ends, ree_slice->LogicalRunEnds(pool));
ASSERT_ARRAYS_EQUAL(*logical_run_ends, *expected_run_ends);
}

// offset=0, length=<length in the middle of a run>
for (int i = 2; i < 5; i++) {
std::shared_ptr<Array> expected_run_ends;
{
std::string expected_run_ends_json = "[100";
for (int j = 2; j < i; j++) {
expected_run_ends_json += ", " + std::to_string(j * 100);
}
expected_run_ends_json += ", " + std::to_string(i * 100 - 50) + "]";
expected_run_ends = ArrayFromJSON(run_end_type, expected_run_ends_json);
}
slice = ree_array->Slice(0, i * 100 - 50);
ree_slice = checked_cast<const RunEndEncodedArray*>(slice.get());
ASSERT_OK_AND_ASSIGN(logical_run_ends, ree_slice->LogicalRunEnds(pool));
ASSERT_ARRAYS_EQUAL(*logical_run_ends, *expected_run_ends);
}

// offset != 0
slice = ree_array->Slice(50, 400);
ree_slice = checked_cast<const RunEndEncodedArray*>(slice.get());
const auto expected_run_ends = ArrayFromJSON(run_end_type, "[50, 150, 250, 350, 400]");
ASSERT_OK_AND_ASSIGN(logical_run_ends, ree_slice->LogicalRunEnds(pool));
ASSERT_ARRAYS_EQUAL(*logical_run_ends, *expected_run_ends);
}

TEST_P(TestRunEndEncodedArray, Builder) {
auto value_type = utf8();
auto ree_type = run_end_encoded(run_end_type, value_type);
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/arrow/ipc/feather_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,8 @@ const std::vector<test::MakeRecordBatch*> kBatchCases = {
&ipc::test::MakeDecimal,
&ipc::test::MakeBooleanBatch,
&ipc::test::MakeFloatBatch,
&ipc::test::MakeIntervals};
&ipc::test::MakeIntervals,
&ipc::test::MakeRunEndEncoded};

} // namespace

Expand Down
18 changes: 16 additions & 2 deletions cpp/src/arrow/ipc/metadata_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -386,8 +386,19 @@ Status ConcreteTypeFromFlatbuffer(flatbuf::Type type, const void* type_data,
case flatbuf::Type::Union:
return UnionFromFlatbuffer(static_cast<const flatbuf::Union*>(type_data), children,
out);
case flatbuf::Type::RunEndEncoded:
if (children.size() != 2) {
return Status::Invalid("RunEndEncoded must have exactly 2 child fields");
}
if (!is_run_end_type(children[0]->type()->id())) {
return Status::Invalid(
"RunEndEncoded run_ends field must be typed as: int16, int32, or int64");
}
*out =
std::make_shared<RunEndEncodedType>(children[0]->type(), children[1]->type());
return Status::OK();
default:
return Status::Invalid("Unrecognized type:" + ToChars(static_cast<int>(type)));
return Status::Invalid("Unrecognized type: " + ToChars(static_cast<int>(type)));
}
}

Expand Down Expand Up @@ -691,7 +702,10 @@ class FieldToFlatbufferVisitor {
}

Status Visit(const RunEndEncodedType& type) {
return Status::NotImplemented("run-end encoded type in IPC");
fb_type_ = flatbuf::Type::RunEndEncoded;
RETURN_NOT_OK(VisitChildFields(type));
type_offset_ = flatbuf::CreateRunEndEncoded(fbb_).Union();
return Status::OK();
}

Status Visit(const ExtensionType& type) {
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/arrow/ipc/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,9 @@ class ArrayLoader {
}

Status Visit(const RunEndEncodedType& type) {
return Status::NotImplemented("run-end encoded array in IPC");
out_->buffers.resize(1);
RETURN_NOT_OK(LoadCommon(type.id()));
return LoadChildren(type.fields());
}

Status Visit(const ExtensionType& type) { return LoadType(*type.storage_type()); }
Expand Down
53 changes: 53 additions & 0 deletions cpp/src/arrow/ipc/test_common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@
#include "arrow/util/bit_util.h"
#include "arrow/util/bitmap_builders.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/key_value_metadata.h"
#include "arrow/util/logging.h"
#include "arrow/util/ree_util.h"

namespace arrow {

Expand Down Expand Up @@ -539,6 +541,57 @@ Status MakeStruct(std::shared_ptr<RecordBatch>* out) {
return Status::OK();
}

Status AddArtificialOffsetInChildArray(ArrayData* array, int64_t offset) {
auto& child = array->child_data[1];
auto builder = MakeBuilder(child->type).ValueOrDie();
ARROW_RETURN_NOT_OK(builder->AppendNulls(offset));
ARROW_RETURN_NOT_OK(builder->AppendArraySlice(ArraySpan(*child), 0, child->length));
array->child_data[1] = builder->Finish().ValueOrDie()->Slice(offset)->data();
return Status::OK();
}

Status MakeRunEndEncoded(std::shared_ptr<RecordBatch>* out) {
const int64_t logical_length = 10000;
const int64_t slice_offset = 2000;
random::RandomArrayGenerator rand(/*seed =*/1);
std::vector<std::shared_ptr<Array>> all_arrays;
std::vector<std::shared_ptr<Field>> all_fields;
for (const bool sliced : {false, true}) {
const int64_t generate_length =
sliced ? logical_length + 2 * slice_offset : logical_length;

std::vector<std::shared_ptr<Array>> arrays = {
rand.RunEndEncoded(int32(), generate_length, 0.5),
rand.RunEndEncoded(int32(), generate_length, 0),
rand.RunEndEncoded(utf8(), generate_length, 0.5),
rand.RunEndEncoded(list(int32()), generate_length, 0.5),
};
std::vector<std::shared_ptr<Field>> fields = {
field("ree_int32", run_end_encoded(int32(), int32())),
field("ree_int32_not_null", run_end_encoded(int32(), int32()), false),
field("ree_string", run_end_encoded(int32(), utf8())),
field("ree_list", run_end_encoded(int32(), list(int32()))),
};

if (sliced) {
for (auto& array : arrays) {
ARROW_RETURN_NOT_OK(
AddArtificialOffsetInChildArray(array->data().get(), slice_offset));
array = array->Slice(slice_offset, logical_length);
}
for (auto& item : fields) {
item = field(item->name() + "_sliced", item->type(), item->nullable(),
item->metadata());
}
}

all_arrays.insert(all_arrays.end(), arrays.begin(), arrays.end());
all_fields.insert(all_fields.end(), fields.begin(), fields.end());
}
*out = RecordBatch::Make(schema(all_fields), logical_length, all_arrays);
return Status::OK();
}

Status MakeUnion(std::shared_ptr<RecordBatch>* out) {
// Define schema
std::vector<std::shared_ptr<Field>> union_fields(
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/ipc/test_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ Status MakeDeeplyNestedList(std::shared_ptr<RecordBatch>* out);
ARROW_TESTING_EXPORT
Status MakeStruct(std::shared_ptr<RecordBatch>* out);

ARROW_TESTING_EXPORT
Status MakeRunEndEncoded(std::shared_ptr<RecordBatch>* out);

ARROW_TESTING_EXPORT
Status MakeUnion(std::shared_ptr<RecordBatch>* out);

Expand Down
14 changes: 12 additions & 2 deletions cpp/src/arrow/ipc/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -522,8 +522,18 @@ class RecordBatchSerializer {
return VisitType(*array.indices());
}

Status Visit(const RunEndEncodedArray& type) {
return Status::NotImplemented("run-end encoded array in IPC");
Status Visit(const RunEndEncodedArray& array) {
// NOTE: LogicalRunEnds() copies the whole run ends array to add an offset and
// clip the ends. To improve performance (by avoiding the extra allocation
// and memory writes) we could fuse this process with serialization.
ARROW_ASSIGN_OR_RAISE(const auto run_ends,
array.LogicalRunEnds(options_.memory_pool));
const auto values = array.LogicalValues();
--max_recursion_depth_;
RETURN_NOT_OK(VisitArray(*run_ends));
RETURN_NOT_OK(VisitArray(*values));
++max_recursion_depth_;
return Status::OK();
}

Status Visit(const ExtensionArray& array) { return VisitType(*array.storage()); }
Expand Down
16 changes: 14 additions & 2 deletions cpp/src/arrow/testing/json_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -748,8 +748,9 @@ void TestSchemaRoundTrip(const Schema& schema) {

DictionaryMemo in_memo;
std::shared_ptr<Schema> out;
if (!json::ReadSchema(d, default_memory_pool(), &in_memo, &out).ok()) {
FAIL() << "Unable to read JSON schema: " << json_schema;
const auto status = json::ReadSchema(d, default_memory_pool(), &in_memo, &out);
if (!status.ok()) {
FAIL() << "Unable to read JSON schema: " << json_schema << "\nStatus: " << status;
}

if (!schema.Equals(*out)) {
Expand Down Expand Up @@ -830,6 +831,9 @@ TEST(TestJsonSchemaWriter, FlatTypes) {
{0, 1})),
field("f19", large_list(uint8())),
field("f20", null()),
field("f21", run_end_encoded(int16(), utf8())),
field("f22", run_end_encoded(int32(), utf8())),
field("f23", run_end_encoded(int64(), utf8())),
};

Schema schema(fields);
Expand Down Expand Up @@ -923,6 +927,14 @@ TEST(TestJsonArrayWriter, NestedTypes) {
StructArray struct_array(struct_type, static_cast<int>(struct_is_valid.size()), fields,
struct_bitmap, 2);
TestArrayRoundTrip(struct_array);

// Run-End Encoded Type
auto run_ends = ArrayFromJSON(int32(), "[100, 200, 300, 400, 500, 600, 700]");
ASSERT_OK_AND_ASSIGN(auto ree_array,
RunEndEncodedArray::Make(700, run_ends, i16_values_array));
TestArrayRoundTrip(*ree_array);
auto sliced_ree_array = ree_array->Slice(150, 300);
TestArrayRoundTrip(*sliced_ree_array);
}

TEST(TestJsonArrayWriter, Unions) {
Expand Down
Loading

0 comments on commit 77ab504

Please sign in to comment.