Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-43129: [C++][Compute] Fix the unnecessary allocation of extra bytes when encoding row table #43125

Merged
merged 7 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/src/arrow/compute/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ add_arrow_test(internals_test
key_hash_test.cc
row/compare_test.cc
row/grouper_test.cc
row/row_test.cc
util_internal_test.cc)

add_arrow_compute_test(expression_test SOURCES expression_test.cc)
Expand Down
17 changes: 12 additions & 5 deletions cpp/src/arrow/compute/row/encode_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,21 @@ void RowTableEncoder::PrepareEncodeSelected(int64_t start_row, int64_t num_rows,
Status RowTableEncoder::EncodeSelected(RowTableImpl* rows, uint32_t num_selected,
const uint16_t* selection) {
rows->Clean();
RETURN_NOT_OK(
rows->AppendEmpty(static_cast<uint32_t>(num_selected), static_cast<uint32_t>(0)));

// First AppendEmpty with num_selected rows and zero extra bytes to resize the
// fixed-length buffers (including buffer for offsets).
RETURN_NOT_OK(
rows->AppendEmpty(static_cast<uint32_t>(num_selected),
/*num_extra_bytes_to_append=*/static_cast<uint32_t>(0)));
// Then populate the offsets of the var-length columns, which will be used as the target
// size of the var-length buffers resizing below.
EncoderOffsets::GetRowOffsetsSelected(rows, batch_varbinary_cols_, num_selected,
selection);

RETURN_NOT_OK(rows->AppendEmpty(static_cast<uint32_t>(0),
static_cast<uint32_t>(rows->offsets()[num_selected])));
// Last AppendEmpty with zero rows and zero extra bytes to resize the var-length buffers
// based on the populated offsets.
RETURN_NOT_OK(
rows->AppendEmpty(/*num_rows_to_append=*/static_cast<uint32_t>(0),
/*num_extra_bytes_to_append=*/static_cast<uint32_t>(0)));

for (size_t icol = 0; icol < batch_all_cols_.size(); ++icol) {
if (batch_all_cols_[icol].metadata().is_fixed_length) {
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/arrow/compute/row/row_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -246,13 +246,13 @@ int64_t RowTableImpl::size_rows_varying_length(int64_t num_bytes) const {
}

void RowTableImpl::UpdateBufferPointers() {
buffers_[0] = null_masks_->mutable_data();
buffers_[0] = null_masks_.get();
if (metadata_.is_fixed_length) {
buffers_[1] = rows_->mutable_data();
buffers_[1] = rows_.get();
buffers_[2] = nullptr;
} else {
buffers_[1] = offsets_->mutable_data();
buffers_[2] = rows_->mutable_data();
buffers_[1] = offsets_.get();
buffers_[2] = rows_.get();
}
}

Expand Down
18 changes: 15 additions & 3 deletions cpp/src/arrow/compute/row/row_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,17 @@ class ARROW_EXPORT RowTableImpl {
// Accessors into the table's buffers
const uint8_t* data(int i) const {
ARROW_DCHECK(i >= 0 && i < kMaxBuffers);
return buffers_[i];
if (ARROW_PREDICT_TRUE(buffers_[i])) {
return buffers_[i]->data();
}
return NULLPTR;
}
uint8_t* mutable_data(int i) {
ARROW_DCHECK(i >= 0 && i < kMaxBuffers);
return buffers_[i];
if (ARROW_PREDICT_TRUE(buffers_[i])) {
return buffers_[i]->mutable_data();
}
return NULLPTR;
}
const uint32_t* offsets() const { return reinterpret_cast<const uint32_t*>(data(1)); }
uint32_t* mutable_offsets() { return reinterpret_cast<uint32_t*>(mutable_data(1)); }
Expand All @@ -207,6 +213,12 @@ class ARROW_EXPORT RowTableImpl {
/// successive calls
bool has_any_nulls(const LightContext* ctx) const;

/// \brief Size of the table's buffers
int64_t buffer_size(int i) const {
ARROW_DCHECK(i >= 0 && i < kMaxBuffers);
return buffers_[i]->size();
}

private:
Status ResizeFixedLengthBuffers(int64_t num_extra_rows);
Status ResizeOptionalVaryingLengthBuffer(int64_t num_extra_bytes);
Expand Down Expand Up @@ -236,7 +248,7 @@ class ARROW_EXPORT RowTableImpl {
// Stores the fixed-length parts of the rows
std::unique_ptr<ResizableBuffer> rows_;
static constexpr int kMaxBuffers = 3;
uint8_t* buffers_[kMaxBuffers];
ResizableBuffer* buffers_[kMaxBuffers];
// The number of rows in the table
int64_t num_rows_;
// The number of rows that can be stored in the table without resizing
Expand Down
129 changes: 129 additions & 0 deletions cpp/src/arrow/compute/row/row_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <numeric>

#include "arrow/compute/row/encode_internal.h"
#include "arrow/compute/row/row_internal.h"
#include "arrow/testing/generator.h"
#include "arrow/testing/gtest_util.h"

namespace arrow {
namespace compute {

namespace {

Result<RowTableImpl> MakeRowTableFromColumn(const std::shared_ptr<Array>& column,
int64_t num_rows, int row_alignment,
int string_alignment) {
DCHECK_GE(column->length(), num_rows);
MemoryPool* pool = default_memory_pool();

std::vector<KeyColumnArray> column_arrays;
std::vector<Datum> values{column};
ExecBatch batch(std::move(values), num_rows);
RETURN_NOT_OK(ColumnArraysFromExecBatch(batch, &column_arrays));

std::vector<KeyColumnMetadata> column_metadatas;
RETURN_NOT_OK(ColumnMetadatasFromExecBatch(batch, &column_metadatas));
RowTableMetadata table_metadata;
table_metadata.FromColumnMetadataVector(column_metadatas, row_alignment,
string_alignment);

RowTableImpl row_table;
RETURN_NOT_OK(row_table.Init(pool, table_metadata));

RowTableEncoder row_encoder;
row_encoder.Init(column_metadatas, row_alignment, string_alignment);
row_encoder.PrepareEncodeSelected(0, num_rows, column_arrays);

std::vector<uint16_t> row_ids(num_rows);
std::iota(row_ids.begin(), row_ids.end(), 0);

RETURN_NOT_OK(row_encoder.EncodeSelected(&row_table, static_cast<uint32_t>(num_rows),
row_ids.data()));

return row_table;
}

} // namespace

// GH-43129: Ensure that the memory consumption of the row table is reasonable, that is,
// with the growth factor of 2, the actual memory usage does not exceed twice the amount
// of memory actually needed.
TEST(RowTableMemoryConsumption, Encode) {
constexpr int64_t num_rows_max = 8192;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment and GH issue reference to explain what this test is checking for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

constexpr int64_t padding_for_vectors = 64;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refers to this

static constexpr int64_t kPaddingForVectors = 64;

which is always appended to the needed capacity when resizing buffers.


ASSERT_OK_AND_ASSIGN(
auto fixed_length_column,
::arrow::gen::Constant(std::make_shared<UInt32Scalar>(0))->Generate(num_rows_max));
ASSERT_OK_AND_ASSIGN(auto var_length_column,
::arrow::gen::Constant(std::make_shared<BinaryScalar>("X"))
->Generate(num_rows_max));

for (int64_t num_rows : {1023, 1024, 1025, 4095, 4096, 4097}) {
// Fixed length column.
{
SCOPED_TRACE("encoding fixed length column of " + std::to_string(num_rows) +
" rows");
ASSERT_OK_AND_ASSIGN(auto row_table,
MakeRowTableFromColumn(fixed_length_column, num_rows,
uint32()->byte_width(), 0));
ASSERT_NE(row_table.data(0), NULLPTR);
ASSERT_NE(row_table.data(1), NULLPTR);
ASSERT_EQ(row_table.data(2), NULLPTR);

int64_t actual_null_mask_size =
num_rows * row_table.metadata().null_masks_bytes_per_row;
ASSERT_LE(actual_null_mask_size, row_table.buffer_size(0) - padding_for_vectors);
ASSERT_GT(actual_null_mask_size * 2,
row_table.buffer_size(0) - padding_for_vectors);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also check that the buffer size is large enough? Same for other inequalities below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, will add them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


int64_t actual_rows_size = num_rows * uint32()->byte_width();
ASSERT_LE(actual_rows_size, row_table.buffer_size(1) - padding_for_vectors);
ASSERT_GT(actual_rows_size * 2, row_table.buffer_size(1) - padding_for_vectors);
}

// Var length column.
{
SCOPED_TRACE("encoding var length column of " + std::to_string(num_rows) + " rows");
ASSERT_OK_AND_ASSIGN(auto row_table,
MakeRowTableFromColumn(var_length_column, num_rows, 4, 4));
ASSERT_NE(row_table.data(0), NULLPTR);
ASSERT_NE(row_table.data(1), NULLPTR);
ASSERT_NE(row_table.data(2), NULLPTR);

int64_t actual_null_mask_size =
num_rows * row_table.metadata().null_masks_bytes_per_row;
ASSERT_LE(actual_null_mask_size, row_table.buffer_size(0) - padding_for_vectors);
ASSERT_GT(actual_null_mask_size * 2,
row_table.buffer_size(0) - padding_for_vectors);

int64_t actual_offset_size = num_rows * sizeof(uint32_t);
ASSERT_LE(actual_offset_size, row_table.buffer_size(1) - padding_for_vectors);
ASSERT_GT(actual_offset_size * 2, row_table.buffer_size(1) - padding_for_vectors);

int64_t actual_rows_size = num_rows * row_table.offsets()[1];
ASSERT_LE(actual_rows_size, row_table.buffer_size(2) - padding_for_vectors);
ASSERT_GT(actual_rows_size * 2, row_table.buffer_size(2) - padding_for_vectors);
}
}
}

} // namespace compute
} // namespace arrow
Loading