diff --git a/cpp/src/arrow/compute/CMakeLists.txt b/cpp/src/arrow/compute/CMakeLists.txt index 0a8018cd580cf..e20b45897db95 100644 --- a/cpp/src/arrow/compute/CMakeLists.txt +++ b/cpp/src/arrow/compute/CMakeLists.txt @@ -92,6 +92,7 @@ add_arrow_test(internals_test key_hash_test.cc row/compare_test.cc row/grouper_test.cc + row/row_test.cc util_internal_test.cc) add_arrow_compute_test(expression_test SOURCES expression_test.cc) diff --git a/cpp/src/arrow/compute/row/encode_internal.cc b/cpp/src/arrow/compute/row/encode_internal.cc index 01d552ef8270f..88ab5b81b1e0a 100644 --- a/cpp/src/arrow/compute/row/encode_internal.cc +++ b/cpp/src/arrow/compute/row/encode_internal.cc @@ -152,14 +152,21 @@ void RowTableEncoder::PrepareEncodeSelected(int64_t start_row, int64_t num_rows, Status RowTableEncoder::EncodeSelected(RowTableImpl* rows, uint32_t num_selected, const uint16_t* selection) { rows->Clean(); - RETURN_NOT_OK( - rows->AppendEmpty(static_cast(num_selected), static_cast(0))); + // First AppendEmpty with num_selected rows and zero extra bytes to resize the + // fixed-length buffers (including buffer for offsets). + RETURN_NOT_OK( + rows->AppendEmpty(static_cast(num_selected), + /*num_extra_bytes_to_append=*/static_cast(0))); + // Then populate the offsets of the var-length columns, which will be used as the target + // size of the var-length buffers resizing below. EncoderOffsets::GetRowOffsetsSelected(rows, batch_varbinary_cols_, num_selected, selection); - - RETURN_NOT_OK(rows->AppendEmpty(static_cast(0), - static_cast(rows->offsets()[num_selected]))); + // Last AppendEmpty with zero rows and zero extra bytes to resize the var-length buffers + // based on the populated offsets. + RETURN_NOT_OK( + rows->AppendEmpty(/*num_rows_to_append=*/static_cast(0), + /*num_extra_bytes_to_append=*/static_cast(0))); for (size_t icol = 0; icol < batch_all_cols_.size(); ++icol) { if (batch_all_cols_[icol].metadata().is_fixed_length) { diff --git a/cpp/src/arrow/compute/row/row_internal.cc b/cpp/src/arrow/compute/row/row_internal.cc index 469205e9b008d..9ac3a4c43e541 100644 --- a/cpp/src/arrow/compute/row/row_internal.cc +++ b/cpp/src/arrow/compute/row/row_internal.cc @@ -246,13 +246,13 @@ int64_t RowTableImpl::size_rows_varying_length(int64_t num_bytes) const { } void RowTableImpl::UpdateBufferPointers() { - buffers_[0] = null_masks_->mutable_data(); + buffers_[0] = null_masks_.get(); if (metadata_.is_fixed_length) { - buffers_[1] = rows_->mutable_data(); + buffers_[1] = rows_.get(); buffers_[2] = nullptr; } else { - buffers_[1] = offsets_->mutable_data(); - buffers_[2] = rows_->mutable_data(); + buffers_[1] = offsets_.get(); + buffers_[2] = rows_.get(); } } diff --git a/cpp/src/arrow/compute/row/row_internal.h b/cpp/src/arrow/compute/row/row_internal.h index 3220b7ffe6e40..80409f93d2b96 100644 --- a/cpp/src/arrow/compute/row/row_internal.h +++ b/cpp/src/arrow/compute/row/row_internal.h @@ -189,11 +189,17 @@ class ARROW_EXPORT RowTableImpl { // Accessors into the table's buffers const uint8_t* data(int i) const { ARROW_DCHECK(i >= 0 && i < kMaxBuffers); - return buffers_[i]; + if (ARROW_PREDICT_TRUE(buffers_[i])) { + return buffers_[i]->data(); + } + return NULLPTR; } uint8_t* mutable_data(int i) { ARROW_DCHECK(i >= 0 && i < kMaxBuffers); - return buffers_[i]; + if (ARROW_PREDICT_TRUE(buffers_[i])) { + return buffers_[i]->mutable_data(); + } + return NULLPTR; } const uint32_t* offsets() const { return reinterpret_cast(data(1)); } uint32_t* mutable_offsets() { return reinterpret_cast(mutable_data(1)); } @@ -207,6 +213,12 @@ class ARROW_EXPORT RowTableImpl { /// successive calls bool has_any_nulls(const LightContext* ctx) const; + /// \brief Size of the table's buffers + int64_t buffer_size(int i) const { + ARROW_DCHECK(i >= 0 && i < kMaxBuffers); + return buffers_[i]->size(); + } + private: Status ResizeFixedLengthBuffers(int64_t num_extra_rows); Status ResizeOptionalVaryingLengthBuffer(int64_t num_extra_bytes); @@ -236,7 +248,7 @@ class ARROW_EXPORT RowTableImpl { // Stores the fixed-length parts of the rows std::unique_ptr rows_; static constexpr int kMaxBuffers = 3; - uint8_t* buffers_[kMaxBuffers]; + ResizableBuffer* buffers_[kMaxBuffers]; // The number of rows in the table int64_t num_rows_; // The number of rows that can be stored in the table without resizing diff --git a/cpp/src/arrow/compute/row/row_test.cc b/cpp/src/arrow/compute/row/row_test.cc new file mode 100644 index 0000000000000..2c1a60dfb231c --- /dev/null +++ b/cpp/src/arrow/compute/row/row_test.cc @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include "arrow/compute/row/encode_internal.h" +#include "arrow/compute/row/row_internal.h" +#include "arrow/testing/generator.h" +#include "arrow/testing/gtest_util.h" + +namespace arrow { +namespace compute { + +namespace { + +Result MakeRowTableFromColumn(const std::shared_ptr& column, + int64_t num_rows, int row_alignment, + int string_alignment) { + DCHECK_GE(column->length(), num_rows); + MemoryPool* pool = default_memory_pool(); + + std::vector column_arrays; + std::vector values{column}; + ExecBatch batch(std::move(values), num_rows); + RETURN_NOT_OK(ColumnArraysFromExecBatch(batch, &column_arrays)); + + std::vector column_metadatas; + RETURN_NOT_OK(ColumnMetadatasFromExecBatch(batch, &column_metadatas)); + RowTableMetadata table_metadata; + table_metadata.FromColumnMetadataVector(column_metadatas, row_alignment, + string_alignment); + + RowTableImpl row_table; + RETURN_NOT_OK(row_table.Init(pool, table_metadata)); + + RowTableEncoder row_encoder; + row_encoder.Init(column_metadatas, row_alignment, string_alignment); + row_encoder.PrepareEncodeSelected(0, num_rows, column_arrays); + + std::vector row_ids(num_rows); + std::iota(row_ids.begin(), row_ids.end(), 0); + + RETURN_NOT_OK(row_encoder.EncodeSelected(&row_table, static_cast(num_rows), + row_ids.data())); + + return row_table; +} + +} // namespace + +// GH-43129: Ensure that the memory consumption of the row table is reasonable, that is, +// with the growth factor of 2, the actual memory usage does not exceed twice the amount +// of memory actually needed. +TEST(RowTableMemoryConsumption, Encode) { + constexpr int64_t num_rows_max = 8192; + constexpr int64_t padding_for_vectors = 64; + + ASSERT_OK_AND_ASSIGN( + auto fixed_length_column, + ::arrow::gen::Constant(std::make_shared(0))->Generate(num_rows_max)); + ASSERT_OK_AND_ASSIGN(auto var_length_column, + ::arrow::gen::Constant(std::make_shared("X")) + ->Generate(num_rows_max)); + + for (int64_t num_rows : {1023, 1024, 1025, 4095, 4096, 4097}) { + // Fixed length column. + { + SCOPED_TRACE("encoding fixed length column of " + std::to_string(num_rows) + + " rows"); + ASSERT_OK_AND_ASSIGN(auto row_table, + MakeRowTableFromColumn(fixed_length_column, num_rows, + uint32()->byte_width(), 0)); + ASSERT_NE(row_table.data(0), NULLPTR); + ASSERT_NE(row_table.data(1), NULLPTR); + ASSERT_EQ(row_table.data(2), NULLPTR); + + int64_t actual_null_mask_size = + num_rows * row_table.metadata().null_masks_bytes_per_row; + ASSERT_LE(actual_null_mask_size, row_table.buffer_size(0) - padding_for_vectors); + ASSERT_GT(actual_null_mask_size * 2, + row_table.buffer_size(0) - padding_for_vectors); + + int64_t actual_rows_size = num_rows * uint32()->byte_width(); + ASSERT_LE(actual_rows_size, row_table.buffer_size(1) - padding_for_vectors); + ASSERT_GT(actual_rows_size * 2, row_table.buffer_size(1) - padding_for_vectors); + } + + // Var length column. + { + SCOPED_TRACE("encoding var length column of " + std::to_string(num_rows) + " rows"); + ASSERT_OK_AND_ASSIGN(auto row_table, + MakeRowTableFromColumn(var_length_column, num_rows, 4, 4)); + ASSERT_NE(row_table.data(0), NULLPTR); + ASSERT_NE(row_table.data(1), NULLPTR); + ASSERT_NE(row_table.data(2), NULLPTR); + + int64_t actual_null_mask_size = + num_rows * row_table.metadata().null_masks_bytes_per_row; + ASSERT_LE(actual_null_mask_size, row_table.buffer_size(0) - padding_for_vectors); + ASSERT_GT(actual_null_mask_size * 2, + row_table.buffer_size(0) - padding_for_vectors); + + int64_t actual_offset_size = num_rows * sizeof(uint32_t); + ASSERT_LE(actual_offset_size, row_table.buffer_size(1) - padding_for_vectors); + ASSERT_GT(actual_offset_size * 2, row_table.buffer_size(1) - padding_for_vectors); + + int64_t actual_rows_size = num_rows * row_table.offsets()[1]; + ASSERT_LE(actual_rows_size, row_table.buffer_size(2) - padding_for_vectors); + ASSERT_GT(actual_rows_size * 2, row_table.buffer_size(2) - padding_for_vectors); + } + } +} + +} // namespace compute +} // namespace arrow