diff --git a/cpp/src/arrow/compute/exec/key_compare.cc b/cpp/src/arrow/compute/exec/key_compare.cc index 18e1576f90edb..07996e0613450 100644 --- a/cpp/src/arrow/compute/exec/key_compare.cc +++ b/cpp/src/arrow/compute/exec/key_compare.cc @@ -33,7 +33,7 @@ template void KeyCompare::NullUpdateColumnToRow( uint32_t id_col, uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx, - const KeyEncoder::KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows, + const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector, bool are_cols_in_encoding_order) { if (!rows.has_any_nulls(ctx) && !col.data(0)) { return; @@ -234,7 +234,7 @@ void KeyCompare::CompareVarBinaryColumnToRowHelper( uint32_t id_varbinary_col, uint32_t first_row_to_compare, uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx, - const KeyEncoder::KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows, + const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector) { const uint32_t* offsets_left = col.offsets(); const uint32_t* offsets_right = rows.offsets(); @@ -289,7 +289,7 @@ template void KeyCompare::CompareVarBinaryColumnToRow( uint32_t id_varbinary_col, uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map, - KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col, + KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector) { uint32_t num_processed = 0; #if defined(ARROW_HAVE_AVX2) @@ -325,9 +325,8 @@ void KeyCompare::CompareColumnsToRows( uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx, uint32_t* out_num_rows, uint16_t* out_sel_left_maybe_same, - const std::vector& cols, - const KeyEncoder::KeyRowArray& rows, bool are_cols_in_encoding_order, - uint8_t* out_match_bitvector_maybe_null) { + const std::vector& cols, const KeyEncoder::KeyRowArray& rows, + bool are_cols_in_encoding_order, uint8_t* out_match_bitvector_maybe_null) { if (num_rows_to_compare == 0) { *out_num_rows = 0; return; @@ -347,7 +346,7 @@ void KeyCompare::CompareColumnsToRows( bool is_first_column = true; for (size_t icol = 0; icol < cols.size(); ++icol) { - const KeyEncoder::KeyColumnArray& col = cols[icol]; + const KeyColumnArray& col = cols[icol]; if (col.metadata().is_null_type) { // If this null type col is the first column, the match_bytevector_A needs to be diff --git a/cpp/src/arrow/compute/exec/key_compare.h b/cpp/src/arrow/compute/exec/key_compare.h index 5d3a4694f73f6..fec2d717e907b 100644 --- a/cpp/src/arrow/compute/exec/key_compare.h +++ b/cpp/src/arrow/compute/exec/key_compare.h @@ -37,16 +37,15 @@ class KeyCompare { uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx, uint32_t* out_num_rows, uint16_t* out_sel_left_maybe_same, - const std::vector& cols, - const KeyEncoder::KeyRowArray& rows, bool are_cols_in_encoding_order, - uint8_t* out_match_bitvector_maybe_null = NULLPTR); + const std::vector& cols, const KeyEncoder::KeyRowArray& rows, + bool are_cols_in_encoding_order, uint8_t* out_match_bitvector_maybe_null = NULLPTR); private: template static void NullUpdateColumnToRow( uint32_t id_col, uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx, - const KeyEncoder::KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows, + const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector, bool are_cols_in_encoding_order); template @@ -68,7 +67,7 @@ class KeyCompare { static void CompareVarBinaryColumnToRowHelper( uint32_t id_varlen_col, uint32_t first_row_to_compare, uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map, - KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col, + KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector); template diff --git a/cpp/src/arrow/compute/exec/key_compare_avx2.cc b/cpp/src/arrow/compute/exec/key_compare_avx2.cc index c557203088317..de7af35468835 100644 --- a/cpp/src/arrow/compute/exec/key_compare_avx2.cc +++ b/cpp/src/arrow/compute/exec/key_compare_avx2.cc @@ -604,7 +604,7 @@ uint32_t KeyCompare::CompareBinaryColumnToRow_avx2( // In this case we will access left column memory 4B at a time num_rows_safe = TailSkipForSIMD::FixBitAccess(sizeof(uint32_t), col.length(), col.bit_offset(1)); - } else if (col_width == 1 && col_width == 2) { + } else if (col_width == 1 || col_width == 2) { // In this case we will access left column memory 4B at a time num_rows_safe = TailSkipForSIMD::FixBinaryAccess(sizeof(uint32_t), col.length(), col_width); diff --git a/cpp/src/arrow/compute/exec/key_encode.cc b/cpp/src/arrow/compute/exec/key_encode.cc index 33674bd7bbf58..bda287d14c77e 100644 --- a/cpp/src/arrow/compute/exec/key_encode.cc +++ b/cpp/src/arrow/compute/exec/key_encode.cc @@ -1290,69 +1290,5 @@ Status KeyEncoder::EncodeSelected(KeyRowArray* rows, uint32_t num_selected, return Status::OK(); } -KeyEncoder::KeyColumnMetadata ColumnMetadataFromDataType( - const std::shared_ptr& type) { - if (type->id() == Type::DICTIONARY) { - auto bit_width = - arrow::internal::checked_cast(*type).bit_width(); - ARROW_DCHECK(bit_width % 8 == 0); - return KeyEncoder::KeyColumnMetadata(true, bit_width / 8); - } else if (type->id() == Type::BOOL) { - return KeyEncoder::KeyColumnMetadata(true, 0); - } else if (is_fixed_width(type->id())) { - return KeyEncoder::KeyColumnMetadata( - true, - arrow::internal::checked_cast(*type).bit_width() / 8); - } else if (is_binary_like(type->id())) { - return KeyEncoder::KeyColumnMetadata(false, sizeof(uint32_t)); - } - ARROW_DCHECK(false); - return KeyEncoder::KeyColumnMetadata(true, sizeof(int)); -} - -KeyEncoder::KeyColumnArray ColumnArrayFromArrayData( - const std::shared_ptr& array_data, int start_row, int num_rows) { - KeyEncoder::KeyColumnArray column_array = KeyEncoder::KeyColumnArray( - ColumnMetadataFromDataType(array_data->type), - array_data->offset + start_row + num_rows, - array_data->buffers[0] != NULLPTR ? array_data->buffers[0]->data() : nullptr, - array_data->buffers[1]->data(), - (array_data->buffers.size() > 2 && array_data->buffers[2] != NULLPTR) - ? array_data->buffers[2]->data() - : nullptr); - return KeyEncoder::KeyColumnArray(column_array, array_data->offset + start_row, - num_rows); -} - -void ColumnMetadatasFromExecBatch( - const ExecBatch& batch, - std::vector& column_metadatas) { - int num_columns = static_cast(batch.values.size()); - column_metadatas.resize(num_columns); - for (int i = 0; i < num_columns; ++i) { - const Datum& data = batch.values[i]; - ARROW_DCHECK(data.is_array()); - const std::shared_ptr& array_data = data.array(); - column_metadatas[i] = ColumnMetadataFromDataType(array_data->type); - } -} - -void ColumnArraysFromExecBatch(const ExecBatch& batch, int start_row, int num_rows, - std::vector& column_arrays) { - int num_columns = static_cast(batch.values.size()); - column_arrays.resize(num_columns); - for (int i = 0; i < num_columns; ++i) { - const Datum& data = batch.values[i]; - ARROW_DCHECK(data.is_array()); - const std::shared_ptr& array_data = data.array(); - column_arrays[i] = ColumnArrayFromArrayData(array_data, start_row, num_rows); - } -} - -void ColumnArraysFromExecBatch(const ExecBatch& batch, - std::vector& column_arrays) { - ColumnArraysFromExecBatch(batch, 0, static_cast(batch.length), column_arrays); -} - } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/exec/key_encode.h b/cpp/src/arrow/compute/exec/key_encode.h index 740d6d72d1ef3..c10a96ee38f60 100644 --- a/cpp/src/arrow/compute/exec/key_encode.h +++ b/cpp/src/arrow/compute/exec/key_encode.h @@ -224,8 +224,8 @@ class KeyEncoder { mutable bool has_any_nulls_; }; - void Init(const std::vector& cols, KeyEncoderContext* ctx, - int row_alignment, int string_alignment); + void Init(const std::vector& cols, int row_alignment, + int string_alignment); const KeyRowMetadata& row_metadata() { return row_metadata_; } @@ -502,16 +502,5 @@ inline void KeyEncoder::EncoderVarBinary::DecodeHelper( } } -KeyEncoder::KeyColumnMetadata ColumnMetadataFromDataType( - const std::shared_ptr& type); -KeyEncoder::KeyColumnArray ColumnArrayFromArrayData( - const std::shared_ptr& array_data, int start_row, int num_rows); -void ColumnMetadatasFromExecBatch( - const ExecBatch& batch, std::vector& column_metadatas); -void ColumnArraysFromExecBatch(const ExecBatch& batch, int start_row, int num_rows, - std::vector& column_arrays); -void ColumnArraysFromExecBatch(const ExecBatch& batch, - std::vector& column_arrays); - } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/exec/key_hash.cc b/cpp/src/arrow/compute/exec/key_hash.cc index 5a84845844ef3..0336e4c94ec4d 100644 --- a/cpp/src/arrow/compute/exec/key_hash.cc +++ b/cpp/src/arrow/compute/exec/key_hash.cc @@ -456,15 +456,16 @@ void Hashing32::HashMultiColumn(const std::vector& cols, } } -void Hashing32::HashBatch(const ExecBatch& key_batch, int start_row, int num_rows, - uint32_t* hashes, - std::vector& column_arrays, - int64_t hardware_flags, util::TempVectorStack* temp_stack) { - ColumnArraysFromExecBatch(key_batch, start_row, num_rows, column_arrays); +Status Hashing32::HashBatch(const ExecBatch& key_batch, int start_row, int num_rows, + uint32_t* hashes, std::vector& column_arrays, + int64_t hardware_flags, util::TempVectorStack* temp_stack) { + RETURN_NOT_OK( + ColumnArraysFromExecBatch(key_batch, start_row, num_rows, &column_arrays)); KeyEncoder::KeyEncoderContext ctx; ctx.hardware_flags = hardware_flags; ctx.stack = temp_stack; HashMultiColumn(column_arrays, &ctx, hashes); + return Status::OK(); } inline uint64_t Hashing64::Avalanche(uint64_t acc) { @@ -886,15 +887,16 @@ void Hashing64::HashMultiColumn(const std::vector& cols, } } -void Hashing64::HashBatch(const ExecBatch& key_batch, int start_row, int num_rows, - uint64_t* hashes, - std::vector& column_arrays, - int64_t hardware_flags, util::TempVectorStack* temp_stack) { - ColumnArraysFromExecBatch(key_batch, start_row, num_rows, column_arrays); +Status Hashing64::HashBatch(const ExecBatch& key_batch, int start_row, int num_rows, + uint64_t* hashes, std::vector& column_arrays, + int64_t hardware_flags, util::TempVectorStack* temp_stack) { + RETURN_NOT_OK( + ColumnArraysFromExecBatch(key_batch, start_row, num_rows, &column_arrays)); KeyEncoder::KeyEncoderContext ctx; ctx.hardware_flags = hardware_flags; ctx.stack = temp_stack; HashMultiColumn(column_arrays, &ctx, hashes); + return Status::OK(); } } // namespace compute diff --git a/cpp/src/arrow/compute/exec/key_hash.h b/cpp/src/arrow/compute/exec/key_hash.h index b23b5c855db4a..0bed2e74f3924 100644 --- a/cpp/src/arrow/compute/exec/key_hash.h +++ b/cpp/src/arrow/compute/exec/key_hash.h @@ -48,10 +48,9 @@ class ARROW_EXPORT Hashing32 { static void HashMultiColumn(const std::vector& cols, KeyEncoder::KeyEncoderContext* ctx, uint32_t* out_hash); - static void HashBatch(const ExecBatch& key_batch, int start_row, int num_rows, - uint32_t* hashes, - std::vector& column_arrays, - int64_t hardware_flags, util::TempVectorStack* temp_stack); + static Status HashBatch(const ExecBatch& key_batch, int start_row, int num_rows, + uint32_t* hashes, std::vector& column_arrays, + int64_t hardware_flags, util::TempVectorStack* temp_stack); private: static const uint32_t PRIME32_1 = 0x9E3779B1; @@ -161,10 +160,9 @@ class ARROW_EXPORT Hashing64 { static void HashMultiColumn(const std::vector& cols, KeyEncoder::KeyEncoderContext* ctx, uint64_t* hashes); - static void HashBatch(const ExecBatch& key_batch, int start_row, int num_rows, - uint64_t* hashes, - std::vector& column_arrays, - int64_t hardware_flags, util::TempVectorStack* temp_stack); + static Status HashBatch(const ExecBatch& key_batch, int start_row, int num_rows, + uint64_t* hashes, std::vector& column_arrays, + int64_t hardware_flags, util::TempVectorStack* temp_stack); private: static const uint64_t PRIME64_1 = 0x9E3779B185EBCA87ULL; diff --git a/cpp/src/arrow/compute/exec/swiss_join.cc b/cpp/src/arrow/compute/exec/swiss_join.cc index 62beba35db14d..c0d6cbedd1ce7 100644 --- a/cpp/src/arrow/compute/exec/swiss_join.cc +++ b/cpp/src/arrow/compute/exec/swiss_join.cc @@ -33,566 +33,6 @@ namespace arrow { namespace compute { -void ResizableArrayData::Init(const std::shared_ptr& data_type, - MemoryPool* pool, int log_num_rows_min) { -#ifndef NDEBUG - if (num_rows_allocated_ > 0) { - ARROW_DCHECK(data_type_ != NULLPTR); - KeyEncoder::KeyColumnMetadata metadata_before = - ColumnMetadataFromDataType(data_type_); - KeyEncoder::KeyColumnMetadata metadata_after = ColumnMetadataFromDataType(data_type); - ARROW_DCHECK(metadata_before.is_fixed_length == metadata_after.is_fixed_length && - metadata_before.fixed_length == metadata_after.fixed_length); - } -#endif - Clear(/*release_buffers=*/false); - log_num_rows_min_ = log_num_rows_min; - data_type_ = data_type; - pool_ = pool; -} - -void ResizableArrayData::Clear(bool release_buffers) { - num_rows_ = 0; - if (release_buffers) { - non_null_buf_.reset(); - fixed_len_buf_.reset(); - var_len_buf_.reset(); - num_rows_allocated_ = 0; - var_len_buf_size_ = 0; - } -} - -Status ResizableArrayData::ResizeFixedLengthBuffers(int num_rows_new) { - ARROW_DCHECK(num_rows_new >= 0); - if (num_rows_new <= num_rows_allocated_) { - num_rows_ = num_rows_new; - return Status::OK(); - } - - int num_rows_allocated_new = 1 << log_num_rows_min_; - while (num_rows_allocated_new < num_rows_new) { - num_rows_allocated_new *= 2; - } - - KeyEncoder::KeyColumnMetadata column_metadata = ColumnMetadataFromDataType(data_type_); - - if (fixed_len_buf_ == NULLPTR) { - ARROW_DCHECK(non_null_buf_ == NULLPTR && var_len_buf_ == NULLPTR); - - ARROW_ASSIGN_OR_RAISE( - non_null_buf_, - AllocateResizableBuffer( - bit_util::BytesForBits(num_rows_allocated_new) + kNumPaddingBytes, pool_)); - if (column_metadata.is_fixed_length) { - if (column_metadata.fixed_length == 0) { - ARROW_ASSIGN_OR_RAISE( - fixed_len_buf_, - AllocateResizableBuffer( - bit_util::BytesForBits(num_rows_allocated_new) + kNumPaddingBytes, - pool_)); - } else { - ARROW_ASSIGN_OR_RAISE( - fixed_len_buf_, - AllocateResizableBuffer( - num_rows_allocated_new * column_metadata.fixed_length + kNumPaddingBytes, - pool_)); - } - } else { - ARROW_ASSIGN_OR_RAISE( - fixed_len_buf_, - AllocateResizableBuffer( - (num_rows_allocated_new + 1) * sizeof(uint32_t) + kNumPaddingBytes, pool_)); - } - - ARROW_ASSIGN_OR_RAISE(var_len_buf_, AllocateResizableBuffer( - sizeof(uint64_t) + kNumPaddingBytes, pool_)); - - var_len_buf_size_ = sizeof(uint64_t); - } else { - ARROW_DCHECK(non_null_buf_ != NULLPTR && var_len_buf_ != NULLPTR); - - RETURN_NOT_OK(non_null_buf_->Resize(bit_util::BytesForBits(num_rows_allocated_new) + - kNumPaddingBytes)); - - if (column_metadata.is_fixed_length) { - if (column_metadata.fixed_length == 0) { - RETURN_NOT_OK(fixed_len_buf_->Resize( - bit_util::BytesForBits(num_rows_allocated_new) + kNumPaddingBytes)); - } else { - RETURN_NOT_OK(fixed_len_buf_->Resize( - num_rows_allocated_new * column_metadata.fixed_length + kNumPaddingBytes)); - } - } else { - RETURN_NOT_OK(fixed_len_buf_->Resize( - (num_rows_allocated_new + 1) * sizeof(uint32_t) + kNumPaddingBytes)); - } - } - - num_rows_allocated_ = num_rows_allocated_new; - num_rows_ = num_rows_new; - - return Status::OK(); -} - -Status ResizableArrayData::ResizeVaryingLengthBuffer() { - KeyEncoder::KeyColumnMetadata column_metadata; - column_metadata = ColumnMetadataFromDataType(data_type_); - - if (!column_metadata.is_fixed_length) { - int min_new_size = static_cast( - reinterpret_cast(fixed_len_buf_->data())[num_rows_]); - ARROW_DCHECK(var_len_buf_size_ > 0); - if (var_len_buf_size_ < min_new_size) { - int new_size = var_len_buf_size_; - while (new_size < min_new_size) { - new_size *= 2; - } - RETURN_NOT_OK(var_len_buf_->Resize(new_size + kNumPaddingBytes)); - var_len_buf_size_ = new_size; - } - } - - return Status::OK(); -} - -KeyEncoder::KeyColumnArray ResizableArrayData::column_array() const { - KeyEncoder::KeyColumnMetadata column_metadata; - column_metadata = ColumnMetadataFromDataType(data_type_); - return KeyEncoder::KeyColumnArray( - column_metadata, num_rows_, non_null_buf_->mutable_data(), - fixed_len_buf_->mutable_data(), var_len_buf_->mutable_data()); -} - -std::shared_ptr ResizableArrayData::array_data() const { - KeyEncoder::KeyColumnMetadata column_metadata; - column_metadata = ColumnMetadataFromDataType(data_type_); - - auto valid_count = arrow::internal::CountSetBits(non_null_buf_->data(), /*offset=*/0, - static_cast(num_rows_)); - int null_count = static_cast(num_rows_) - static_cast(valid_count); - - if (column_metadata.is_fixed_length) { - return ArrayData::Make(data_type_, num_rows_, {non_null_buf_, fixed_len_buf_}, - null_count); - } else { - return ArrayData::Make(data_type_, num_rows_, - {non_null_buf_, fixed_len_buf_, var_len_buf_}, null_count); - } -} - -int ExecBatchBuilder::NumRowsToSkip(const std::shared_ptr& column, - int num_rows, const uint16_t* row_ids, - int num_tail_bytes_to_skip) { -#ifndef NDEBUG - // Ids must be in non-decreasing order - // - for (int i = 1; i < num_rows; ++i) { - ARROW_DCHECK(row_ids[i] >= row_ids[i - 1]); - } -#endif - - KeyEncoder::KeyColumnMetadata column_metadata = - ColumnMetadataFromDataType(column->type); - - int num_rows_left = num_rows; - int num_bytes_skipped = 0; - while (num_rows_left > 0 && num_bytes_skipped < num_tail_bytes_to_skip) { - if (column_metadata.is_fixed_length) { - if (column_metadata.fixed_length == 0) { - num_rows_left = std::max(num_rows_left, 8) - 8; - ++num_bytes_skipped; - } else { - --num_rows_left; - num_bytes_skipped += column_metadata.fixed_length; - } - } else { - --num_rows_left; - int row_id_removed = row_ids[num_rows_left]; - const uint32_t* offsets = - reinterpret_cast(column->buffers[1]->data()); - num_bytes_skipped += offsets[row_id_removed + 1] - offsets[row_id_removed]; - } - } - - return num_rows - num_rows_left; -} - -template -void ExecBatchBuilder::CollectBitsImp(const uint8_t* input_bits, - int64_t input_bits_offset, uint8_t* output_bits, - int64_t output_bits_offset, int num_rows, - const uint16_t* row_ids) { - if (!OUTPUT_BYTE_ALIGNED) { - ARROW_DCHECK(output_bits_offset % 8 > 0); - output_bits[output_bits_offset / 8] &= - static_cast((1 << (output_bits_offset % 8)) - 1); - } else { - ARROW_DCHECK(output_bits_offset % 8 == 0); - } - constexpr int unroll = 8; - for (int i = 0; i < num_rows / unroll; ++i) { - const uint16_t* row_ids_base = row_ids + unroll * i; - uint8_t result; - result = bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[0]) ? 1 : 0; - result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[1]) ? 2 : 0; - result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[2]) ? 4 : 0; - result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[3]) ? 8 : 0; - result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[4]) ? 16 : 0; - result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[5]) ? 32 : 0; - result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[6]) ? 64 : 0; - result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[7]) ? 128 : 0; - if (OUTPUT_BYTE_ALIGNED) { - output_bits[output_bits_offset / 8 + i] = result; - } else { - output_bits[output_bits_offset / 8 + i] |= - static_cast(result << (output_bits_offset % 8)); - output_bits[output_bits_offset / 8 + i + 1] = - static_cast(result >> (8 - (output_bits_offset % 8))); - } - } - if (num_rows % unroll > 0) { - for (int i = num_rows - (num_rows % unroll); i < num_rows; ++i) { - bit_util::SetBitTo(output_bits, output_bits_offset + i, - bit_util::GetBit(input_bits, input_bits_offset + row_ids[i])); - } - } -} - -void ExecBatchBuilder::CollectBits(const uint8_t* input_bits, int64_t input_bits_offset, - uint8_t* output_bits, int64_t output_bits_offset, - int num_rows, const uint16_t* row_ids) { - if (output_bits_offset % 8 > 0) { - CollectBitsImp(input_bits, input_bits_offset, output_bits, output_bits_offset, - num_rows, row_ids); - } else { - CollectBitsImp(input_bits, input_bits_offset, output_bits, output_bits_offset, - num_rows, row_ids); - } -} - -template -void ExecBatchBuilder::Visit(const std::shared_ptr& column, int num_rows, - const uint16_t* row_ids, PROCESS_VALUE_FN process_value_fn) { - KeyEncoder::KeyColumnMetadata metadata = ColumnMetadataFromDataType(column->type); - - if (!metadata.is_fixed_length) { - const uint8_t* ptr_base = column->buffers[2]->data(); - const uint32_t* offsets = - reinterpret_cast(column->buffers[1]->data()) + column->offset; - for (int i = 0; i < num_rows; ++i) { - uint16_t row_id = row_ids[i]; - const uint8_t* field_ptr = ptr_base + offsets[row_id]; - uint32_t field_length = offsets[row_id + 1] - offsets[row_id]; - process_value_fn(i, field_ptr, field_length); - } - } else { - ARROW_DCHECK(metadata.fixed_length > 0); - for (int i = 0; i < num_rows; ++i) { - uint16_t row_id = row_ids[i]; - const uint8_t* field_ptr = - column->buffers[1]->data() + - (column->offset + row_id) * static_cast(metadata.fixed_length); - process_value_fn(i, field_ptr, metadata.fixed_length); - } - } -} - -Status ExecBatchBuilder::AppendSelected(const std::shared_ptr& source, - ResizableArrayData& target, - int num_rows_to_append, const uint16_t* row_ids, - MemoryPool* pool) { - int num_rows_before = target.num_rows(); - ARROW_DCHECK(num_rows_before >= 0); - int num_rows_after = num_rows_before + num_rows_to_append; - if (target.num_rows() == 0) { - target.Init(source->type, pool, kLogNumRows); - } - RETURN_NOT_OK(target.ResizeFixedLengthBuffers(num_rows_after)); - - KeyEncoder::KeyColumnMetadata column_metadata = - ColumnMetadataFromDataType(source->type); - - if (column_metadata.is_fixed_length) { - // Fixed length column - // - uint32_t fixed_length = column_metadata.fixed_length; - switch (fixed_length) { - case 0: - CollectBits(source->buffers[1]->data(), source->offset, target.mutable_data(1), - num_rows_before, num_rows_to_append, row_ids); - break; - case 1: - Visit(source, num_rows_to_append, row_ids, - [&](int i, const uint8_t* ptr, uint32_t num_bytes) { - target.mutable_data(1)[num_rows_before + i] = *ptr; - }); - break; - case 2: - Visit(source, num_rows_to_append, row_ids, - [&](int i, const uint8_t* ptr, uint32_t num_bytes) { - reinterpret_cast(target.mutable_data(1))[num_rows_before + i] = - *reinterpret_cast(ptr); - }); - break; - case 4: - Visit(source, num_rows_to_append, row_ids, - [&](int i, const uint8_t* ptr, uint32_t num_bytes) { - reinterpret_cast(target.mutable_data(1))[num_rows_before + i] = - *reinterpret_cast(ptr); - }); - break; - case 8: - Visit(source, num_rows_to_append, row_ids, - [&](int i, const uint8_t* ptr, uint32_t num_bytes) { - reinterpret_cast(target.mutable_data(1))[num_rows_before + i] = - *reinterpret_cast(ptr); - }); - break; - default: { - int num_rows_to_process = - num_rows_to_append - - NumRowsToSkip(source, num_rows_to_append, row_ids, sizeof(uint64_t)); - Visit(source, num_rows_to_process, row_ids, - [&](int i, const uint8_t* ptr, uint32_t num_bytes) { - uint64_t* dst = reinterpret_cast( - target.mutable_data(1) + - static_cast(num_bytes) * (num_rows_before + i)); - const uint64_t* src = reinterpret_cast(ptr); - for (uint32_t word_id = 0; - word_id < bit_util::CeilDiv(num_bytes, sizeof(uint64_t)); - ++word_id) { - util::SafeStore(dst + word_id, util::SafeLoad(src + word_id)); - } - }); - if (num_rows_to_append > num_rows_to_process) { - Visit(source, num_rows_to_append - num_rows_to_process, - row_ids + num_rows_to_process, - [&](int i, const uint8_t* ptr, uint32_t num_bytes) { - uint64_t* dst = reinterpret_cast( - target.mutable_data(1) + - static_cast(num_bytes) * - (num_rows_before + num_rows_to_process + i)); - const uint64_t* src = reinterpret_cast(ptr); - memcpy(dst, src, num_bytes); - }); - } - } - } - } else { - // Varying length column - // - - // Step 1: calculate target offsets - // - uint32_t* offsets = reinterpret_cast(target.mutable_data(1)); - uint32_t sum = num_rows_before == 0 ? 0 : offsets[num_rows_before]; - Visit(source, num_rows_to_append, row_ids, - [&](int i, const uint8_t* ptr, uint32_t num_bytes) { - offsets[num_rows_before + i] = num_bytes; - }); - for (int i = 0; i < num_rows_to_append; ++i) { - uint32_t length = offsets[num_rows_before + i]; - offsets[num_rows_before + i] = sum; - sum += length; - } - offsets[num_rows_before + num_rows_to_append] = sum; - - // Step 2: resize output buffers - // - RETURN_NOT_OK(target.ResizeVaryingLengthBuffer()); - - // Step 3: copy varying-length data - // - int num_rows_to_process = - num_rows_to_append - - NumRowsToSkip(source, num_rows_to_append, row_ids, sizeof(uint64_t)); - Visit(source, num_rows_to_process, row_ids, - [&](int i, const uint8_t* ptr, uint32_t num_bytes) { - uint64_t* dst = reinterpret_cast(target.mutable_data(2) + - offsets[num_rows_before + i]); - const uint64_t* src = reinterpret_cast(ptr); - for (uint32_t word_id = 0; - word_id < bit_util::CeilDiv(num_bytes, sizeof(uint64_t)); ++word_id) { - util::SafeStore(dst + word_id, util::SafeLoad(src + word_id)); - } - }); - Visit(source, num_rows_to_append - num_rows_to_process, row_ids + num_rows_to_process, - [&](int i, const uint8_t* ptr, uint32_t num_bytes) { - uint64_t* dst = reinterpret_cast( - target.mutable_data(2) + - offsets[num_rows_before + num_rows_to_process + i]); - const uint64_t* src = reinterpret_cast(ptr); - memcpy(dst, src, num_bytes); - }); - } - - // Process nulls - // - if (source->buffers[0] == NULLPTR) { - uint8_t* dst = target.mutable_data(0); - dst[num_rows_before / 8] |= static_cast(~0ULL << (num_rows_before & 7)); - for (int i = num_rows_before / 8 + 1; - i < bit_util::BytesForBits(num_rows_before + num_rows_to_append); ++i) { - dst[i] = 0xff; - } - } else { - CollectBits(source->buffers[0]->data(), source->offset, target.mutable_data(0), - num_rows_before, num_rows_to_append, row_ids); - } - - return Status::OK(); -} - -Status ExecBatchBuilder::AppendNulls(const std::shared_ptr& type, - ResizableArrayData& target, int num_rows_to_append, - MemoryPool* pool) { - int num_rows_before = target.num_rows(); - int num_rows_after = num_rows_before + num_rows_to_append; - if (target.num_rows() == 0) { - target.Init(type, pool, kLogNumRows); - } - RETURN_NOT_OK(target.ResizeFixedLengthBuffers(num_rows_after)); - - KeyEncoder::KeyColumnMetadata column_metadata = ColumnMetadataFromDataType(type); - - // Process fixed length buffer - // - if (column_metadata.is_fixed_length) { - uint8_t* dst = target.mutable_data(1); - if (column_metadata.fixed_length == 0) { - dst[num_rows_before / 8] &= static_cast((1 << (num_rows_before % 8)) - 1); - int64_t offset_begin = num_rows_before / 8 + 1; - int64_t offset_end = bit_util::BytesForBits(num_rows_after); - if (offset_end > offset_begin) { - memset(dst + offset_begin, 0, offset_end - offset_begin); - } - } else { - memset(dst + num_rows_before * static_cast(column_metadata.fixed_length), - 0, static_cast(column_metadata.fixed_length) * num_rows_to_append); - } - } else { - uint32_t* dst = reinterpret_cast(target.mutable_data(1)); - uint32_t sum = num_rows_before == 0 ? 0 : dst[num_rows_before]; - for (int64_t i = num_rows_before; i <= num_rows_after; ++i) { - dst[i] = sum; - } - } - - // Process nulls - // - uint8_t* dst = target.mutable_data(0); - dst[num_rows_before / 8] &= static_cast((1 << (num_rows_before % 8)) - 1); - int64_t offset_begin = num_rows_before / 8 + 1; - int64_t offset_end = bit_util::BytesForBits(num_rows_after); - if (offset_end > offset_begin) { - memset(dst + offset_begin, 0, offset_end - offset_begin); - } - - return Status::OK(); -} - -Status ExecBatchBuilder::AppendSelected(MemoryPool* pool, const ExecBatch& batch, - int num_rows_to_append, const uint16_t* row_ids, - int num_cols, const int* col_ids) { - if (num_rows_to_append == 0) { - return Status::OK(); - } - // If this is the first time we append rows, then initialize output buffers. - // - if (values_.empty()) { - values_.resize(num_cols); - for (int i = 0; i < num_cols; ++i) { - const Datum& data = batch.values[col_ids ? col_ids[i] : i]; - ARROW_DCHECK(data.is_array()); - const std::shared_ptr& array_data = data.array(); - values_[i].Init(array_data->type, pool, kLogNumRows); - } - } - - for (size_t i = 0; i < values_.size(); ++i) { - const Datum& data = batch.values[col_ids ? col_ids[i] : i]; - ARROW_DCHECK(data.is_array()); - const std::shared_ptr& array_data = data.array(); - RETURN_NOT_OK( - AppendSelected(array_data, values_[i], num_rows_to_append, row_ids, pool)); - } - - return Status::OK(); -} - -Status ExecBatchBuilder::AppendSelected(MemoryPool* pool, const ExecBatch& batch, - int num_rows_to_append, const uint16_t* row_ids, - int* num_appended, int num_cols, - const int* col_ids) { - *num_appended = 0; - if (num_rows_to_append == 0) { - return Status::OK(); - } - int num_rows_max = 1 << kLogNumRows; - int num_rows_present = num_rows(); - if (num_rows_present >= num_rows_max) { - return Status::OK(); - } - int num_rows_available = num_rows_max - num_rows_present; - int num_rows_next = std::min(num_rows_available, num_rows_to_append); - RETURN_NOT_OK(AppendSelected(pool, batch, num_rows_next, row_ids, num_cols, col_ids)); - *num_appended = num_rows_next; - return Status::OK(); -} - -Status ExecBatchBuilder::AppendNulls(MemoryPool* pool, - const std::vector>& types, - int num_rows_to_append) { - if (num_rows_to_append == 0) { - return Status::OK(); - } - - // If this is the first time we append rows, then initialize output buffers. - // - if (values_.empty()) { - values_.resize(types.size()); - for (size_t i = 0; i < types.size(); ++i) { - values_[i].Init(types[i], pool, kLogNumRows); - } - } - - for (size_t i = 0; i < values_.size(); ++i) { - RETURN_NOT_OK(AppendNulls(types[i], values_[i], num_rows_to_append, pool)); - } - - return Status::OK(); -} - -Status ExecBatchBuilder::AppendNulls(MemoryPool* pool, - const std::vector>& types, - int num_rows_to_append, int* num_appended) { - *num_appended = 0; - if (num_rows_to_append == 0) { - return Status::OK(); - } - int num_rows_max = 1 << kLogNumRows; - int num_rows_present = num_rows(); - if (num_rows_present >= num_rows_max) { - return Status::OK(); - } - int num_rows_available = num_rows_max - num_rows_present; - int num_rows_next = std::min(num_rows_available, num_rows_to_append); - RETURN_NOT_OK(AppendNulls(pool, types, num_rows_next)); - *num_appended = num_rows_next; - return Status::OK(); -} - -ExecBatch ExecBatchBuilder::Flush() { - ARROW_DCHECK(num_rows() > 0); - ExecBatch out({}, num_rows()); - out.values.resize(values_.size()); - for (size_t i = 0; i < values_.size(); ++i) { - out.values[i] = values_[i].array_data(); - values_[i].Clear(true); - } - return out; -} - int RowArrayAccessor::VarbinaryColumnId(const KeyEncoder::KeyRowMetadata& row_metadata, int column_id) { ARROW_DCHECK(row_metadata.num_cols() > static_cast(column_id)); @@ -766,8 +206,8 @@ Status RowArray::InitIfNeeded(MemoryPool* pool, const ExecBatch& batch) { if (is_initialized_) { return Status::OK(); } - std::vector column_metadatas; - ColumnMetadatasFromExecBatch(batch, column_metadatas); + std::vector column_metadatas; + RETURN_NOT_OK(ColumnMetadatasFromExecBatch(batch, &column_metadatas)); KeyEncoder::KeyRowMetadata row_metadata; row_metadata.FromColumnMetadataVector(column_metadatas, sizeof(uint64_t), sizeof(uint64_t)); @@ -775,13 +215,13 @@ Status RowArray::InitIfNeeded(MemoryPool* pool, const ExecBatch& batch) { return InitIfNeeded(pool, row_metadata); } -Status RowArray::AppendBatchSelection( - MemoryPool* pool, const ExecBatch& batch, int begin_row_id, int end_row_id, - int num_row_ids, const uint16_t* row_ids, - std::vector& temp_column_arrays) { +Status RowArray::AppendBatchSelection(MemoryPool* pool, const ExecBatch& batch, + int begin_row_id, int end_row_id, int num_row_ids, + const uint16_t* row_ids, + std::vector& temp_column_arrays) { RETURN_NOT_OK(InitIfNeeded(pool, batch)); - ColumnArraysFromExecBatch(batch, begin_row_id, end_row_id - begin_row_id, - temp_column_arrays); + RETURN_NOT_OK(ColumnArraysFromExecBatch(batch, begin_row_id, end_row_id - begin_row_id, + &temp_column_arrays)); encoder_.PrepareEncodeSelected( /*start_row=*/0, end_row_id - begin_row_id, temp_column_arrays); RETURN_NOT_OK(encoder_.EncodeSelected(&rows_temp_, num_row_ids, row_ids)); @@ -794,10 +234,11 @@ void RowArray::Compare(const ExecBatch& batch, int begin_row_id, int end_row_id, const uint32_t* array_row_ids, uint32_t* out_num_not_equal, uint16_t* out_not_equal_selection, int64_t hardware_flags, util::TempVectorStack* temp_stack, - std::vector& temp_column_arrays, + std::vector& temp_column_arrays, uint8_t* out_match_bitvector_maybe_null) { - ColumnArraysFromExecBatch(batch, begin_row_id, end_row_id - begin_row_id, - temp_column_arrays); + Status status = ColumnArraysFromExecBatch( + batch, begin_row_id, end_row_id - begin_row_id, &temp_column_arrays); + ARROW_DCHECK(status.ok()); KeyEncoder::KeyEncoderContext ctx; ctx.hardware_flags = hardware_flags; @@ -819,7 +260,7 @@ Status RowArray::DecodeSelected(ResizableArrayData* output, int column_id, // instructions. // - KeyEncoder::KeyColumnMetadata column_metadata = output->column_metadata(); + ARROW_ASSIGN_OR_RAISE(KeyColumnMetadata column_metadata, output->column_metadata()); if (column_metadata.is_fixed_length) { uint32_t fixed_length = column_metadata.fixed_length; @@ -1322,10 +763,10 @@ void SwissTableMerge::InsertNewGroups(SwissTable* target, } } -SwissTableWithKeys::Input::Input( - const ExecBatch* in_batch, int in_batch_start_row, int in_batch_end_row, - util::TempVectorStack* in_temp_stack, - std::vector* in_temp_column_arrays) +SwissTableWithKeys::Input::Input(const ExecBatch* in_batch, int in_batch_start_row, + int in_batch_end_row, + util::TempVectorStack* in_temp_stack, + std::vector* in_temp_column_arrays) : batch(in_batch), batch_start_row(in_batch_start_row), batch_end_row(in_batch_end_row), @@ -1335,9 +776,9 @@ SwissTableWithKeys::Input::Input( temp_column_arrays(in_temp_column_arrays), temp_group_ids(nullptr) {} -SwissTableWithKeys::Input::Input( - const ExecBatch* in_batch, util::TempVectorStack* in_temp_stack, - std::vector* in_temp_column_arrays) +SwissTableWithKeys::Input::Input(const ExecBatch* in_batch, + util::TempVectorStack* in_temp_stack, + std::vector* in_temp_column_arrays) : batch(in_batch), batch_start_row(0), batch_end_row(static_cast(in_batch->length)), @@ -1347,11 +788,11 @@ SwissTableWithKeys::Input::Input( temp_column_arrays(in_temp_column_arrays), temp_group_ids(nullptr) {} -SwissTableWithKeys::Input::Input( - const ExecBatch* in_batch, int in_num_selected, const uint16_t* in_selection, - util::TempVectorStack* in_temp_stack, - std::vector* in_temp_column_arrays, - std::vector* in_temp_group_ids) +SwissTableWithKeys::Input::Input(const ExecBatch* in_batch, int in_num_selected, + const uint16_t* in_selection, + util::TempVectorStack* in_temp_stack, + std::vector* in_temp_column_arrays, + std::vector* in_temp_group_ids) : batch(in_batch), batch_start_row(0), batch_end_row(static_cast(in_batch->length)), @@ -1518,9 +959,11 @@ void SwissTableWithKeys::Hash(Input* input, uint32_t* hashes, int64_t hardware_f // ARROW_DCHECK(input->selection_maybe_null == nullptr); - Hashing32::HashBatch(*input->batch, input->batch_start_row, - input->batch_end_row - input->batch_start_row, hashes, - *input->temp_column_arrays, hardware_flags, input->temp_stack); + Status status = + Hashing32::HashBatch(*input->batch, input->batch_start_row, + input->batch_end_row - input->batch_start_row, hashes, + *input->temp_column_arrays, hardware_flags, input->temp_stack); + ARROW_DCHECK(status.ok()); } void SwissTableWithKeys::MapReadOnly(Input* input, const uint32_t* hashes, @@ -1598,10 +1041,10 @@ Status SwissTableWithKeys::Map(Input* input, bool insert_missing, const uint32_t return Status::OK(); } -void SwissTableForJoin::Lookup( - const ExecBatch& batch, int start_row, int num_rows, uint8_t* out_has_match_bitvector, - uint32_t* out_key_ids, util::TempVectorStack* temp_stack, - std::vector* temp_column_arrays) { +void SwissTableForJoin::Lookup(const ExecBatch& batch, int start_row, int num_rows, + uint8_t* out_has_match_bitvector, uint32_t* out_key_ids, + util::TempVectorStack* temp_stack, + std::vector* temp_column_arrays) { SwissTableWithKeys::Input input(&batch, start_row, start_row + num_rows, temp_stack, temp_column_arrays); @@ -1708,11 +1151,11 @@ void SwissTableForJoin::payload_ids_to_key_ids(int num_rows, const uint32_t* pay } } -Status SwissTableForJoinBuild::Init( - SwissTableForJoin* target, int dop, int64_t num_rows, bool reject_duplicate_keys, - bool no_payload, const std::vector& key_types, - const std::vector& payload_types, MemoryPool* pool, - int64_t hardware_flags) { +Status SwissTableForJoinBuild::Init(SwissTableForJoin* target, int dop, int64_t num_rows, + bool reject_duplicate_keys, bool no_payload, + const std::vector& key_types, + const std::vector& payload_types, + MemoryPool* pool, int64_t hardware_flags) { target_ = target; dop_ = dop; num_rows_ = num_rows; @@ -1769,9 +1212,10 @@ Status SwissTableForJoinBuild::PushNextBatch(int64_t thread_id, // Compute hash // locals.batch_hashes.resize(key_batch.length); - Hashing32::HashBatch(key_batch, /*start_row=*/0, static_cast(key_batch.length), - locals.batch_hashes.data(), locals.temp_column_arrays, - hardware_flags_, temp_stack); + RETURN_NOT_OK( + Hashing32::HashBatch(key_batch, /*start_row=*/0, static_cast(key_batch.length), + locals.batch_hashes.data(), locals.temp_column_arrays, + hardware_flags_, temp_stack)); // Partition on hash // @@ -2431,10 +1875,10 @@ void JoinProbeProcessor::Init(int num_key_columns, JoinType join_type, output_batch_fn_ = output_batch_fn; } -Status JoinProbeProcessor::OnNextBatch( - int64_t thread_id, const ExecBatch& keypayload_batch, - util::TempVectorStack* temp_stack, - std::vector* temp_column_arrays) { +Status JoinProbeProcessor::OnNextBatch(int64_t thread_id, + const ExecBatch& keypayload_batch, + util::TempVectorStack* temp_stack, + std::vector* temp_column_arrays) { const SwissTable* swiss_table = hash_table_->keys()->swiss_table(); int64_t hardware_flags = swiss_table->hardware_flags(); int minibatch_size = swiss_table->minibatch_size(); @@ -2868,15 +2312,19 @@ class SwissJoin : public HashJoinImpl { batches.CloseAll(); - std::vector key_types; + std::vector key_types; for (int i = 0; i < schema->num_cols(HashJoinProjection::KEY); ++i) { - key_types.push_back( + ARROW_ASSIGN_OR_RAISE( + KeyColumnMetadata metadata, ColumnMetadataFromDataType(schema->data_type(HashJoinProjection::KEY, i))); + key_types.push_back(metadata); } - std::vector payload_types; + std::vector payload_types; for (int i = 0; i < schema->num_cols(HashJoinProjection::PAYLOAD); ++i) { - payload_types.push_back( + ARROW_ASSIGN_OR_RAISE( + KeyColumnMetadata metadata, ColumnMetadataFromDataType(schema->data_type(HashJoinProjection::PAYLOAD, i))); + payload_types.push_back(metadata); } RETURN_NOT_OK(CancelIfNotOK(hash_table_build_.Init( &hash_table_, num_threads_, batches.num_shared_rows(), reject_duplicate_keys, @@ -3079,9 +2527,10 @@ class SwissJoin : public HashJoinImpl { // Get the list of key and payload ids from this mini-batch to output. // - uint32_t first_key_id = hash_table_.payload_id_to_key_id(mini_batch_start); - uint32_t last_key_id = - hash_table_.payload_id_to_key_id(mini_batch_start + mini_batch_size_next - 1); + uint32_t first_key_id = + hash_table_.payload_id_to_key_id(static_cast(mini_batch_start)); + uint32_t last_key_id = hash_table_.payload_id_to_key_id( + static_cast(mini_batch_start + mini_batch_size_next - 1)); int num_output_rows = 0; for (uint32_t key_id = first_key_id; key_id <= last_key_id; ++key_id) { if (bit_util::GetBit(hash_table_.has_match(), key_id) == bit_to_output) { @@ -3237,7 +2686,7 @@ class SwissJoin : public HashJoinImpl { struct ThreadLocalState { JoinResultMaterialize materialize; util::TempVectorStack temp_stack; - std::vector temp_column_arrays; + std::vector temp_column_arrays; int64_t num_output_batches; bool hash_table_ready; }; diff --git a/cpp/src/arrow/compute/exec/swiss_join.h b/cpp/src/arrow/compute/exec/swiss_join.h index d419cc0c62e0d..2e2256761c209 100644 --- a/cpp/src/arrow/compute/exec/swiss_join.h +++ b/cpp/src/arrow/compute/exec/swiss_join.h @@ -24,122 +24,11 @@ #include "arrow/compute/exec/partition_util.h" #include "arrow/compute/exec/schema_util.h" #include "arrow/compute/exec/task_util.h" +#include "arrow/compute/light_array.h" namespace arrow { namespace compute { -class ResizableArrayData { - public: - ResizableArrayData() - : log_num_rows_min_(0), - pool_(NULLPTR), - num_rows_(0), - num_rows_allocated_(0), - var_len_buf_size_(0) {} - ~ResizableArrayData() { Clear(true); } - void Init(const std::shared_ptr& data_type, MemoryPool* pool, - int log_num_rows_min); - void Clear(bool release_buffers); - Status ResizeFixedLengthBuffers(int num_rows_new); - Status ResizeVaryingLengthBuffer(); - int num_rows() const { return num_rows_; } - KeyEncoder::KeyColumnArray column_array() const; - KeyEncoder::KeyColumnMetadata column_metadata() const { - return ColumnMetadataFromDataType(data_type_); - } - std::shared_ptr array_data() const; - uint8_t* mutable_data(int i) { - return i == 0 ? non_null_buf_->mutable_data() - : i == 1 ? fixed_len_buf_->mutable_data() - : var_len_buf_->mutable_data(); - } - - private: - static constexpr int64_t kNumPaddingBytes = 64; - int log_num_rows_min_; - std::shared_ptr data_type_; - MemoryPool* pool_; - int num_rows_; - int num_rows_allocated_; - int var_len_buf_size_; - std::shared_ptr non_null_buf_; - std::shared_ptr fixed_len_buf_; - std::shared_ptr var_len_buf_; -}; - -class ExecBatchBuilder { - public: - static Status AppendSelected(const std::shared_ptr& source, - ResizableArrayData& target, int num_rows_to_append, - const uint16_t* row_ids, MemoryPool* pool); - - static Status AppendNulls(const std::shared_ptr& type, - ResizableArrayData& target, int num_rows_to_append, - MemoryPool* pool); - - Status AppendSelected(MemoryPool* pool, const ExecBatch& batch, int num_rows_to_append, - const uint16_t* row_ids, int num_cols, - const int* col_ids = NULLPTR); - - Status AppendSelected(MemoryPool* pool, const ExecBatch& batch, int num_rows_to_append, - const uint16_t* row_ids, int* num_appended, int num_cols, - const int* col_ids = NULLPTR); - - Status AppendNulls(MemoryPool* pool, - const std::vector>& types, - int num_rows_to_append); - - Status AppendNulls(MemoryPool* pool, - const std::vector>& types, - int num_rows_to_append, int* num_appended); - - // Should only be called if num_rows() returns non-zero. - // - ExecBatch Flush(); - - int num_rows() const { return values_.empty() ? 0 : values_[0].num_rows(); } - - static int num_rows_max() { return 1 << kLogNumRows; } - - private: - static constexpr int kLogNumRows = 15; - - // Calculate how many rows to skip from the tail of the - // sequence of selected rows, such that the total size of skipped rows is at - // least equal to the size specified by the caller. Skipping of the tail rows - // is used to allow for faster processing by the caller of remaining rows - // without checking buffer bounds (useful with SIMD or fixed size memory loads - // and stores). - // - // The sequence of row_ids provided must be non-decreasing. - // - static int NumRowsToSkip(const std::shared_ptr& column, int num_rows, - const uint16_t* row_ids, int num_tail_bytes_to_skip); - - // The supplied lambda will be called for each row in the given list of rows. - // The arguments given to it will be: - // - index of a row (within the set of selected rows), - // - pointer to the value, - // - byte length of the value. - // - // The information about nulls (validity bitmap) is not used in this call and - // has to be processed separately. - // - template - static void Visit(const std::shared_ptr& column, int num_rows, - const uint16_t* row_ids, PROCESS_VALUE_FN process_value_fn); - - template - static void CollectBitsImp(const uint8_t* input_bits, int64_t input_bits_offset, - uint8_t* output_bits, int64_t output_bits_offset, - int num_rows, const uint16_t* row_ids); - static void CollectBits(const uint8_t* input_bits, int64_t input_bits_offset, - uint8_t* output_bits, int64_t output_bits_offset, int num_rows, - const uint16_t* row_ids); - - std::vector values_; -}; - class RowArrayAccessor { public: // Find the index of this varbinary column within the sequence of all @@ -215,10 +104,9 @@ struct RowArray { Status InitIfNeeded(MemoryPool* pool, const ExecBatch& batch); Status InitIfNeeded(MemoryPool* pool, const KeyEncoder::KeyRowMetadata& row_metadata); - Status AppendBatchSelection( - MemoryPool* pool, const ExecBatch& batch, int begin_row_id, int end_row_id, - int num_row_ids, const uint16_t* row_ids, - std::vector& temp_column_arrays); + Status AppendBatchSelection(MemoryPool* pool, const ExecBatch& batch, int begin_row_id, + int end_row_id, int num_row_ids, const uint16_t* row_ids, + std::vector& temp_column_arrays); // This can only be called for a minibatch. // @@ -226,7 +114,7 @@ struct RowArray { const uint16_t* batch_selection_maybe_null, const uint32_t* array_row_ids, uint32_t* out_num_not_equal, uint16_t* out_not_equal_selection, int64_t hardware_flags, util::TempVectorStack* temp_stack, - std::vector& temp_column_arrays, + std::vector& temp_column_arrays, uint8_t* out_match_bitvector_maybe_null = NULLPTR); // TODO: add AVX2 version @@ -391,14 +279,14 @@ struct SwissTableWithKeys { struct Input { Input(const ExecBatch* in_batch, int in_batch_start_row, int in_batch_end_row, util::TempVectorStack* in_temp_stack, - std::vector* in_temp_column_arrays); + std::vector* in_temp_column_arrays); Input(const ExecBatch* in_batch, util::TempVectorStack* in_temp_stack, - std::vector* in_temp_column_arrays); + std::vector* in_temp_column_arrays); Input(const ExecBatch* in_batch, int in_num_selected, const uint16_t* in_selection, util::TempVectorStack* in_temp_stack, - std::vector* in_temp_column_arrays, + std::vector* in_temp_column_arrays, std::vector* in_temp_group_ids); Input(const Input& base, int num_rows_to_skip, int num_rows_to_include); @@ -417,7 +305,7 @@ struct SwissTableWithKeys { // Thread specific scratch buffers for storing temporary data. // util::TempVectorStack* temp_stack; - std::vector* temp_column_arrays; + std::vector* temp_column_arrays; std::vector* temp_group_ids; }; @@ -474,7 +362,7 @@ class SwissTableForJoin { void Lookup(const ExecBatch& batch, int start_row, int num_rows, uint8_t* out_has_match_bitvector, uint32_t* out_key_ids, util::TempVectorStack* temp_stack, - std::vector* temp_column_arrays); + std::vector* temp_column_arrays); void UpdateHasMatchForKeys(int64_t thread_id, int num_rows, const uint32_t* key_ids); void MergeHasMatch(); @@ -528,9 +416,9 @@ class SwissTableForJoinBuild { public: Status Init(SwissTableForJoin* target, int dop, int64_t num_rows, bool reject_duplicate_keys, bool no_payload, - const std::vector& key_types, - const std::vector& payload_types, - MemoryPool* pool, int64_t hardware_flags); + const std::vector& key_types, + const std::vector& payload_types, MemoryPool* pool, + int64_t hardware_flags); // In the first phase of parallel hash table build, threads pick unprocessed // exec batches, partition the rows based on hash, and update all of the @@ -628,7 +516,7 @@ class SwissTableForJoinBuild { std::vector batch_prtn_row_ids; std::vector temp_prtn_ids; std::vector temp_group_ids; - std::vector temp_column_arrays; + std::vector temp_column_arrays; }; std::vector prtn_states_; @@ -851,7 +739,7 @@ class JoinProbeProcessor { const std::vector* cmp, OutputBatchFn output_batch_fn); Status OnNextBatch(int64_t thread_id, const ExecBatch& keypayload_batch, util::TempVectorStack* temp_stack, - std::vector* temp_column_arrays); + std::vector* temp_column_arrays); // Must be called by a single-thread having exclusive access to the instance // of this class. The caller is responsible for ensuring that. diff --git a/cpp/src/arrow/compute/exec/swiss_join_avx2.cc b/cpp/src/arrow/compute/exec/swiss_join_avx2.cc index 2ddb5983448f8..011abc2939451 100644 --- a/cpp/src/arrow/compute/exec/swiss_join_avx2.cc +++ b/cpp/src/arrow/compute/exec/swiss_join_avx2.cc @@ -48,7 +48,6 @@ int RowArrayAccessor::Visit_avx2(const KeyEncoder::KeyRowArray& rows, int column int varbinary_column_id = VarbinaryColumnId(rows.metadata(), column_id); const uint8_t* row_ptr_base = rows.data(2); const uint32_t* row_offsets = rows.offsets(); - uint32_t field_offset_within_row, field_length; if (varbinary_column_id == 0) { // Case 1: This is the first varbinary column @@ -132,7 +131,7 @@ int RowArrayAccessor::Visit_avx2(const KeyEncoder::KeyRowArray& rows, int column // Case 3: This is a fixed length column in fixed length row // const uint8_t* row_ptr_base = rows.data(1); - for (uint32_t i = 0; i < num_rows / unroll; ++i) { + for (int i = 0; i < num_rows / unroll; ++i) { __m256i row_id = _mm256_loadu_si256(reinterpret_cast(row_ids) + i); __m256i row_offset = _mm256_mullo_epi32(row_id, field_length); @@ -144,7 +143,7 @@ int RowArrayAccessor::Visit_avx2(const KeyEncoder::KeyRowArray& rows, int column // const uint8_t* row_ptr_base = rows.data(2); const uint32_t* row_offsets = rows.offsets(); - for (uint32_t i = 0; i < num_rows / unroll; ++i) { + for (int i = 0; i < num_rows / unroll; ++i) { __m256i row_id = _mm256_loadu_si256(reinterpret_cast(row_ids) + i); __m256i row_offset = _mm256_i32gather_epi32( @@ -170,7 +169,7 @@ int RowArrayAccessor::VisitNulls_avx2(const KeyEncoder::KeyRowArray& rows, int c const uint8_t* null_masks = rows.null_masks(); __m256i null_bits_per_row = _mm256_set1_epi32(8 * rows.metadata().null_masks_bytes_per_row); - for (uint32_t i = 0; i < num_rows / unroll; ++i) { + for (int i = 0; i < num_rows / unroll; ++i) { __m256i row_id = _mm256_loadu_si256(reinterpret_cast(row_ids) + i); __m256i bit_id = _mm256_mullo_epi32(row_id, null_bits_per_row); bit_id = _mm256_add_epi32(bit_id, _mm256_set1_epi32(column_id));