Skip to content

Commit

Permalink
Faster version of hash join - post rebase fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
michalursa committed May 4, 2022
1 parent ac272cd commit 4971959
Show file tree
Hide file tree
Showing 10 changed files with 109 additions and 850 deletions.
13 changes: 6 additions & 7 deletions cpp/src/arrow/compute/exec/key_compare.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ template <bool use_selection>
void KeyCompare::NullUpdateColumnToRow(
uint32_t id_col, uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null,
const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx,
const KeyEncoder::KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows,
const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows,
uint8_t* match_bytevector, bool are_cols_in_encoding_order) {
if (!rows.has_any_nulls(ctx) && !col.data(0)) {
return;
Expand Down Expand Up @@ -234,7 +234,7 @@ void KeyCompare::CompareVarBinaryColumnToRowHelper(
uint32_t id_varbinary_col, uint32_t first_row_to_compare,
uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null,
const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx,
const KeyEncoder::KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows,
const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows,
uint8_t* match_bytevector) {
const uint32_t* offsets_left = col.offsets();
const uint32_t* offsets_right = rows.offsets();
Expand Down Expand Up @@ -289,7 +289,7 @@ template <bool use_selection, bool is_first_varbinary_col>
void KeyCompare::CompareVarBinaryColumnToRow(
uint32_t id_varbinary_col, uint32_t num_rows_to_compare,
const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col,
KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector) {
uint32_t num_processed = 0;
#if defined(ARROW_HAVE_AVX2)
Expand Down Expand Up @@ -325,9 +325,8 @@ void KeyCompare::CompareColumnsToRows(
uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null,
const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx,
uint32_t* out_num_rows, uint16_t* out_sel_left_maybe_same,
const std::vector<KeyEncoder::KeyColumnArray>& cols,
const KeyEncoder::KeyRowArray& rows, bool are_cols_in_encoding_order,
uint8_t* out_match_bitvector_maybe_null) {
const std::vector<KeyColumnArray>& cols, const KeyEncoder::KeyRowArray& rows,
bool are_cols_in_encoding_order, uint8_t* out_match_bitvector_maybe_null) {
if (num_rows_to_compare == 0) {
*out_num_rows = 0;
return;
Expand All @@ -347,7 +346,7 @@ void KeyCompare::CompareColumnsToRows(

bool is_first_column = true;
for (size_t icol = 0; icol < cols.size(); ++icol) {
const KeyEncoder::KeyColumnArray& col = cols[icol];
const KeyColumnArray& col = cols[icol];

if (col.metadata().is_null_type) {
// If this null type col is the first column, the match_bytevector_A needs to be
Expand Down
9 changes: 4 additions & 5 deletions cpp/src/arrow/compute/exec/key_compare.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,15 @@ class KeyCompare {
uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null,
const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx,
uint32_t* out_num_rows, uint16_t* out_sel_left_maybe_same,
const std::vector<KeyEncoder::KeyColumnArray>& cols,
const KeyEncoder::KeyRowArray& rows, bool are_cols_in_encoding_order,
uint8_t* out_match_bitvector_maybe_null = NULLPTR);
const std::vector<KeyColumnArray>& cols, const KeyEncoder::KeyRowArray& rows,
bool are_cols_in_encoding_order, uint8_t* out_match_bitvector_maybe_null = NULLPTR);

private:
template <bool use_selection>
static void NullUpdateColumnToRow(
uint32_t id_col, uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null,
const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx,
const KeyEncoder::KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows,
const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows,
uint8_t* match_bytevector, bool are_cols_in_encoding_order);

template <bool use_selection, class COMPARE_FN>
Expand All @@ -68,7 +67,7 @@ class KeyCompare {
static void CompareVarBinaryColumnToRowHelper(
uint32_t id_varlen_col, uint32_t first_row_to_compare, uint32_t num_rows_to_compare,
const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col,
KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector);

template <bool use_selection, bool is_first_varbinary_col>
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/compute/exec/key_compare_avx2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ uint32_t KeyCompare::CompareBinaryColumnToRow_avx2(
// In this case we will access left column memory 4B at a time
num_rows_safe =
TailSkipForSIMD::FixBitAccess(sizeof(uint32_t), col.length(), col.bit_offset(1));
} else if (col_width == 1 && col_width == 2) {
} else if (col_width == 1 || col_width == 2) {
// In this case we will access left column memory 4B at a time
num_rows_safe =
TailSkipForSIMD::FixBinaryAccess(sizeof(uint32_t), col.length(), col_width);
Expand Down
64 changes: 0 additions & 64 deletions cpp/src/arrow/compute/exec/key_encode.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1290,69 +1290,5 @@ Status KeyEncoder::EncodeSelected(KeyRowArray* rows, uint32_t num_selected,
return Status::OK();
}

KeyEncoder::KeyColumnMetadata ColumnMetadataFromDataType(
const std::shared_ptr<DataType>& type) {
if (type->id() == Type::DICTIONARY) {
auto bit_width =
arrow::internal::checked_cast<const FixedWidthType&>(*type).bit_width();
ARROW_DCHECK(bit_width % 8 == 0);
return KeyEncoder::KeyColumnMetadata(true, bit_width / 8);
} else if (type->id() == Type::BOOL) {
return KeyEncoder::KeyColumnMetadata(true, 0);
} else if (is_fixed_width(type->id())) {
return KeyEncoder::KeyColumnMetadata(
true,
arrow::internal::checked_cast<const FixedWidthType&>(*type).bit_width() / 8);
} else if (is_binary_like(type->id())) {
return KeyEncoder::KeyColumnMetadata(false, sizeof(uint32_t));
}
ARROW_DCHECK(false);
return KeyEncoder::KeyColumnMetadata(true, sizeof(int));
}

KeyEncoder::KeyColumnArray ColumnArrayFromArrayData(
const std::shared_ptr<ArrayData>& array_data, int start_row, int num_rows) {
KeyEncoder::KeyColumnArray column_array = KeyEncoder::KeyColumnArray(
ColumnMetadataFromDataType(array_data->type),
array_data->offset + start_row + num_rows,
array_data->buffers[0] != NULLPTR ? array_data->buffers[0]->data() : nullptr,
array_data->buffers[1]->data(),
(array_data->buffers.size() > 2 && array_data->buffers[2] != NULLPTR)
? array_data->buffers[2]->data()
: nullptr);
return KeyEncoder::KeyColumnArray(column_array, array_data->offset + start_row,
num_rows);
}

void ColumnMetadatasFromExecBatch(
const ExecBatch& batch,
std::vector<KeyEncoder::KeyColumnMetadata>& column_metadatas) {
int num_columns = static_cast<int>(batch.values.size());
column_metadatas.resize(num_columns);
for (int i = 0; i < num_columns; ++i) {
const Datum& data = batch.values[i];
ARROW_DCHECK(data.is_array());
const std::shared_ptr<ArrayData>& array_data = data.array();
column_metadatas[i] = ColumnMetadataFromDataType(array_data->type);
}
}

void ColumnArraysFromExecBatch(const ExecBatch& batch, int start_row, int num_rows,
std::vector<KeyEncoder::KeyColumnArray>& column_arrays) {
int num_columns = static_cast<int>(batch.values.size());
column_arrays.resize(num_columns);
for (int i = 0; i < num_columns; ++i) {
const Datum& data = batch.values[i];
ARROW_DCHECK(data.is_array());
const std::shared_ptr<ArrayData>& array_data = data.array();
column_arrays[i] = ColumnArrayFromArrayData(array_data, start_row, num_rows);
}
}

void ColumnArraysFromExecBatch(const ExecBatch& batch,
std::vector<KeyEncoder::KeyColumnArray>& column_arrays) {
ColumnArraysFromExecBatch(batch, 0, static_cast<int>(batch.length), column_arrays);
}

} // namespace compute
} // namespace arrow
15 changes: 2 additions & 13 deletions cpp/src/arrow/compute/exec/key_encode.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,8 @@ class KeyEncoder {
mutable bool has_any_nulls_;
};

void Init(const std::vector<KeyColumnMetadata>& cols, KeyEncoderContext* ctx,
int row_alignment, int string_alignment);
void Init(const std::vector<KeyColumnMetadata>& cols, int row_alignment,
int string_alignment);

const KeyRowMetadata& row_metadata() { return row_metadata_; }

Expand Down Expand Up @@ -502,16 +502,5 @@ inline void KeyEncoder::EncoderVarBinary::DecodeHelper(
}
}

KeyEncoder::KeyColumnMetadata ColumnMetadataFromDataType(
const std::shared_ptr<DataType>& type);
KeyEncoder::KeyColumnArray ColumnArrayFromArrayData(
const std::shared_ptr<ArrayData>& array_data, int start_row, int num_rows);
void ColumnMetadatasFromExecBatch(
const ExecBatch& batch, std::vector<KeyEncoder::KeyColumnMetadata>& column_metadatas);
void ColumnArraysFromExecBatch(const ExecBatch& batch, int start_row, int num_rows,
std::vector<KeyEncoder::KeyColumnArray>& column_arrays);
void ColumnArraysFromExecBatch(const ExecBatch& batch,
std::vector<KeyEncoder::KeyColumnArray>& column_arrays);

} // namespace compute
} // namespace arrow
22 changes: 12 additions & 10 deletions cpp/src/arrow/compute/exec/key_hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -456,15 +456,16 @@ void Hashing32::HashMultiColumn(const std::vector<KeyColumnArray>& cols,
}
}

void Hashing32::HashBatch(const ExecBatch& key_batch, int start_row, int num_rows,
uint32_t* hashes,
std::vector<KeyEncoder::KeyColumnArray>& column_arrays,
int64_t hardware_flags, util::TempVectorStack* temp_stack) {
ColumnArraysFromExecBatch(key_batch, start_row, num_rows, column_arrays);
Status Hashing32::HashBatch(const ExecBatch& key_batch, int start_row, int num_rows,
uint32_t* hashes, std::vector<KeyColumnArray>& column_arrays,
int64_t hardware_flags, util::TempVectorStack* temp_stack) {
RETURN_NOT_OK(
ColumnArraysFromExecBatch(key_batch, start_row, num_rows, &column_arrays));
KeyEncoder::KeyEncoderContext ctx;
ctx.hardware_flags = hardware_flags;
ctx.stack = temp_stack;
HashMultiColumn(column_arrays, &ctx, hashes);
return Status::OK();
}

inline uint64_t Hashing64::Avalanche(uint64_t acc) {
Expand Down Expand Up @@ -886,15 +887,16 @@ void Hashing64::HashMultiColumn(const std::vector<KeyColumnArray>& cols,
}
}

void Hashing64::HashBatch(const ExecBatch& key_batch, int start_row, int num_rows,
uint64_t* hashes,
std::vector<KeyEncoder::KeyColumnArray>& column_arrays,
int64_t hardware_flags, util::TempVectorStack* temp_stack) {
ColumnArraysFromExecBatch(key_batch, start_row, num_rows, column_arrays);
Status Hashing64::HashBatch(const ExecBatch& key_batch, int start_row, int num_rows,
uint64_t* hashes, std::vector<KeyColumnArray>& column_arrays,
int64_t hardware_flags, util::TempVectorStack* temp_stack) {
RETURN_NOT_OK(
ColumnArraysFromExecBatch(key_batch, start_row, num_rows, &column_arrays));
KeyEncoder::KeyEncoderContext ctx;
ctx.hardware_flags = hardware_flags;
ctx.stack = temp_stack;
HashMultiColumn(column_arrays, &ctx, hashes);
return Status::OK();
}

} // namespace compute
Expand Down
14 changes: 6 additions & 8 deletions cpp/src/arrow/compute/exec/key_hash.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,9 @@ class ARROW_EXPORT Hashing32 {
static void HashMultiColumn(const std::vector<KeyColumnArray>& cols,
KeyEncoder::KeyEncoderContext* ctx, uint32_t* out_hash);

static void HashBatch(const ExecBatch& key_batch, int start_row, int num_rows,
uint32_t* hashes,
std::vector<KeyEncoder::KeyColumnArray>& column_arrays,
int64_t hardware_flags, util::TempVectorStack* temp_stack);
static Status HashBatch(const ExecBatch& key_batch, int start_row, int num_rows,
uint32_t* hashes, std::vector<KeyColumnArray>& column_arrays,
int64_t hardware_flags, util::TempVectorStack* temp_stack);

private:
static const uint32_t PRIME32_1 = 0x9E3779B1;
Expand Down Expand Up @@ -161,10 +160,9 @@ class ARROW_EXPORT Hashing64 {
static void HashMultiColumn(const std::vector<KeyColumnArray>& cols,
KeyEncoder::KeyEncoderContext* ctx, uint64_t* hashes);

static void HashBatch(const ExecBatch& key_batch, int start_row, int num_rows,
uint64_t* hashes,
std::vector<KeyEncoder::KeyColumnArray>& column_arrays,
int64_t hardware_flags, util::TempVectorStack* temp_stack);
static Status HashBatch(const ExecBatch& key_batch, int start_row, int num_rows,
uint64_t* hashes, std::vector<KeyColumnArray>& column_arrays,
int64_t hardware_flags, util::TempVectorStack* temp_stack);

private:
static const uint64_t PRIME64_1 = 0x9E3779B185EBCA87ULL;
Expand Down
Loading

0 comments on commit 4971959

Please sign in to comment.