Skip to content

Commit

Permalink
[Enhancement] revert SegmentedChunk for JoinHashTable (StarRocks#52570)
Browse files Browse the repository at this point in the history
Signed-off-by: Murphy <[email protected]>
Signed-off-by: zhiminr.ren <[email protected]>
  • Loading branch information
murphyatwork authored and renzhimin7 committed Nov 7, 2024
1 parent c1908fd commit 7a7edc7
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 38 deletions.
40 changes: 23 additions & 17 deletions be/src/exec/join_hash_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ void JoinHashTable::create(const HashTableParam& param) {
_probe_state->probe_counter = param.probe_counter;
}

_table_items->build_chunk = std::make_shared<SegmentedChunk>(param.build_chunk_segment_size);
_table_items->build_chunk = std::make_shared<Chunk>();
_table_items->with_other_conjunct = param.with_other_conjunct;
_table_items->join_type = param.join_type;
_table_items->mor_reader_mode = param.mor_reader_mode;
Expand Down Expand Up @@ -486,7 +486,6 @@ void JoinHashTable::_init_build_column(const HashTableParam& param) {
_table_items->build_column_count++;
}
}
_table_items->build_chunk->build_columns();
}

void JoinHashTable::_init_mor_reader() {
Expand All @@ -512,9 +511,13 @@ void JoinHashTable::_init_mor_reader() {

void JoinHashTable::_init_join_keys() {
for (const auto& key_desc : _table_items->join_keys) {
auto key_column = ColumnHelper::create_column(*key_desc.type, false);
key_column->append_default();
_table_items->key_columns.emplace_back(key_column);
if (key_desc.col_ref) {
_table_items->key_columns.emplace_back(nullptr);
} else {
auto key_column = ColumnHelper::create_column(*key_desc.type, false);
key_column->append_default();
_table_items->key_columns.emplace_back(key_column);
}
}
}

Expand Down Expand Up @@ -542,12 +545,12 @@ Status JoinHashTable::build(RuntimeState* state) {
RETURN_IF_ERROR(_table_items->build_chunk->upgrade_if_overflow());
_table_items->has_large_column = _table_items->build_chunk->has_large_column();

// build key_columns
// If the join key is column ref of build chunk, fetch from build chunk directly
size_t join_key_count = _table_items->join_keys.size();
for (size_t i = 0; i < join_key_count; i++) {
if (_table_items->join_keys[i].col_ref != nullptr) {
SlotId slot_id = _table_items->join_keys[i].col_ref->slot_id();
_table_items->key_columns[i] = _table_items->build_chunk->get_column_by_slot_id(slot_id)->materialize();
_table_items->key_columns[i] = _table_items->build_chunk->get_column_by_slot_id(slot_id);
}
}

Expand Down Expand Up @@ -623,15 +626,19 @@ Status JoinHashTable::probe_remain(RuntimeState* state, ChunkPtr* chunk, bool* e
}

void JoinHashTable::append_chunk(const ChunkPtr& chunk, const Columns& key_columns) {
// TODO: simplify the SlotId mapping, if the slot of input chunk is same as build_chunk, we don't need to remap them
std::vector<SlotId> slots;
Columns& columns = _table_items->build_chunk->columns();

for (size_t i = 0; i < _table_items->build_column_count; i++) {
SlotDescriptor* slot = _table_items->build_slots[i].slot;
slots.push_back(slot->id());
ColumnPtr& column = chunk->get_column_by_slot_id(slot->id());

if (!columns[i]->is_nullable() && column->is_nullable()) {
// upgrade to nullable column
columns[i] = NullableColumn::create(columns[i], NullColumn::create(columns[i]->size(), 0));
}
columns[i]->append(*column);
}
_table_items->build_chunk->append_chunk(chunk, slots);

// TODO: it's useless for the optimizer, but there're stil some UT depending on it
for (size_t i = 0; i < _table_items->key_columns.size(); i++) {
// If the join key is slot ref, will get from build chunk directly,
// otherwise will append from key_column of input
Expand All @@ -647,22 +654,21 @@ void JoinHashTable::append_chunk(const ChunkPtr& chunk, const Columns& key_colum
}

_table_items->row_count += chunk->num_rows();
DCHECK_EQ(_table_items->row_count + 1, _table_items->build_chunk->num_rows());
}

void JoinHashTable::merge_ht(const JoinHashTable& ht) {
_table_items->row_count += ht._table_items->row_count;

auto& columns = _table_items->build_chunk->columns();
auto& other_columns = ht._table_items->build_chunk->columns();
Columns& columns = _table_items->build_chunk->columns();
Columns& other_columns = ht._table_items->build_chunk->columns();

for (size_t i = 0; i < _table_items->build_column_count; i++) {
if (!columns[i]->is_nullable() && other_columns[i]->is_nullable()) {
// upgrade to nullable column
columns[i]->upgrade_to_nullable();
columns[i] = NullableColumn::create(columns[i], NullColumn::create(columns[i]->size(), 0));
}
columns[i]->append(*other_columns[i], 1, other_columns[i]->size() - 1);
}
_table_items->build_chunk->append(ht._table_items->build_chunk, 1);
}

ChunkPtr JoinHashTable::convert_to_spill_schema(const ChunkPtr& chunk) const {
Expand Down
14 changes: 5 additions & 9 deletions be/src/exec/join_hash_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

#pragma once

#include "storage/chunk_helper.h"
#include "util/runtime_profile.h"
#define JOIN_HASH_MAP_H

Expand Down Expand Up @@ -100,7 +99,8 @@ struct HashTableSlotDescriptor {
};

struct JoinHashTableItems {
SegmentedChunkPtr build_chunk = nullptr;
//TODO: memory continues problem?
ChunkPtr build_chunk = nullptr;
Columns key_columns;
std::vector<HashTableSlotDescriptor> build_slots;
std::vector<HashTableSlotDescriptor> probe_slots;
Expand Down Expand Up @@ -294,9 +294,6 @@ struct HashTableParam {
RuntimeProfile::Counter* output_probe_column_timer = nullptr;
RuntimeProfile::Counter* probe_counter = nullptr;
bool mor_reader_mode = false;

// TODO: optimize this according to chunk width
size_t build_chunk_segment_size = 1 << 20;
};

template <class T>
Expand Down Expand Up @@ -691,10 +688,9 @@ class JoinHashMap {

void _copy_probe_nullable_column(ColumnPtr* src_column, ChunkPtr* chunk, const SlotDescriptor* slot);

void _copy_build_column(const SegmentedColumnPtr& src_column, ChunkPtr* chunk, const SlotDescriptor* slot,
bool to_nullable);
void _copy_build_column(const ColumnPtr& src_column, ChunkPtr* chunk, const SlotDescriptor* slot, bool to_nullable);

void _copy_build_nullable_column(const SegmentedColumnPtr& src_column, ChunkPtr* chunk, const SlotDescriptor* slot);
void _copy_build_nullable_column(const ColumnPtr& src_column, ChunkPtr* chunk, const SlotDescriptor* slot);

void _probe_index_output(ChunkPtr* chunk);
void _build_index_output(ChunkPtr* chunk);
Expand Down Expand Up @@ -842,7 +838,7 @@ class JoinHashTable {
// convert input column to spill schema order
ChunkPtr convert_to_spill_schema(const ChunkPtr& chunk) const;

const SegmentedChunkPtr& get_build_chunk() const { return _table_items->build_chunk; }
const ChunkPtr& get_build_chunk() const { return _table_items->build_chunk; }
Columns& get_key_columns() { return _table_items->key_columns; }
const Columns& get_key_columns() const { return _table_items->key_columns; }
uint32_t get_row_count() const { return _table_items->row_count; }
Expand Down
23 changes: 12 additions & 11 deletions be/src/exec/join_hash_map.tpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "column/vectorized_fwd.h"
#include "simd/simd.h"
#include "util/runtime_profile.h"

Expand Down Expand Up @@ -59,7 +58,6 @@ void JoinBuildFunc<LT>::construct_hash_table(RuntimeState* state, JoinHashTableI
if (table_items->key_columns[0]->is_nullable()) {
auto* nullable_column = ColumnHelper::as_raw_column<NullableColumn>(table_items->key_columns[0]);
auto& null_array = nullable_column->null_column()->get_data();
DCHECK_EQ(data.size(), table_items->row_count + 1);
for (size_t i = 1; i < table_items->row_count + 1; i++) {
if (null_array[i] == 0) {
uint32_t bucket_num = JoinHashMapHelper::calc_bucket_num<CppType>(data[i], table_items->bucket_size);
Expand All @@ -68,7 +66,6 @@ void JoinBuildFunc<LT>::construct_hash_table(RuntimeState* state, JoinHashTableI
}
}
} else {
DCHECK_EQ(data.size(), table_items->row_count + 1);
for (size_t i = 1; i < table_items->row_count + 1; i++) {
uint32_t bucket_num = JoinHashMapHelper::calc_bucket_num<CppType>(data[i], table_items->bucket_size);
table_items->next[i] = table_items->first[bucket_num];
Expand Down Expand Up @@ -622,7 +619,7 @@ void JoinHashMap<LT, BuildFunc, ProbeFunc>::_build_output(ChunkPtr* chunk) {

bool need_output = is_lazy ? hash_table_slot.need_lazy_materialize : hash_table_slot.need_output;
if (need_output) {
auto& column = _table_items->build_chunk->columns()[i];
ColumnPtr& column = _table_items->build_chunk->columns()[i];
if (!column->is_nullable()) {
_copy_build_column(column, chunk, slot, to_nullable);
} else {
Expand Down Expand Up @@ -687,10 +684,11 @@ void JoinHashMap<LT, BuildFunc, ProbeFunc>::_copy_probe_nullable_column(ColumnPt
}

template <LogicalType LT, class BuildFunc, class ProbeFunc>
void JoinHashMap<LT, BuildFunc, ProbeFunc>::_copy_build_column(const SegmentedColumnPtr& src_column, ChunkPtr* chunk,
void JoinHashMap<LT, BuildFunc, ProbeFunc>::_copy_build_column(const ColumnPtr& src_column, ChunkPtr* chunk,
const SlotDescriptor* slot, bool to_nullable) {
if (to_nullable) {
auto data_column = src_column->clone_selective(_probe_state->build_index.data(), 0, _probe_state->count);
auto data_column = src_column->clone_empty();
data_column->append_selective(*src_column, _probe_state->build_index.data(), 0, _probe_state->count);

// When left outer join is executed,
// build_index[i] Equal to 0 means it is not found in the hash table,
Expand All @@ -706,15 +704,18 @@ void JoinHashMap<LT, BuildFunc, ProbeFunc>::_copy_build_column(const SegmentedCo
auto dest_column = NullableColumn::create(std::move(data_column), null_column);
(*chunk)->append_column(std::move(dest_column), slot->id());
} else {
auto data_column = src_column->clone_selective(_probe_state->build_index.data(), 0, _probe_state->count);
(*chunk)->append_column(std::move(data_column), slot->id());
auto dest_column = src_column->clone_empty();
dest_column->append_selective(*src_column, _probe_state->build_index.data(), 0, _probe_state->count);
(*chunk)->append_column(std::move(dest_column), slot->id());
}
}

template <LogicalType LT, class BuildFunc, class ProbeFunc>
void JoinHashMap<LT, BuildFunc, ProbeFunc>::_copy_build_nullable_column(const SegmentedColumnPtr& src_column,
ChunkPtr* chunk, const SlotDescriptor* slot) {
ColumnPtr dest_column = src_column->clone_selective(_probe_state->build_index.data(), 0, _probe_state->count);
void JoinHashMap<LT, BuildFunc, ProbeFunc>::_copy_build_nullable_column(const ColumnPtr& src_column, ChunkPtr* chunk,
const SlotDescriptor* slot) {
ColumnPtr dest_column = src_column->clone_empty();

dest_column->append_selective(*src_column, _probe_state->build_index.data(), 0, _probe_state->count);

// When left outer join is executed,
// build_index[i] Equal to 0 means it is not found in the hash table,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class SpillableHashJoinBuildOperator final : public HashJoinBuildOperator {

size_t _hash_table_iterate_idx = 0;
std::vector<JoinHashTable*> _hash_tables;
SegmentedChunkSlice _hash_table_build_chunk_slice;
ChunkSharedSlice _hash_table_build_chunk_slice;
std::function<StatusOr<ChunkPtr>()> _hash_table_slice_iterator;
bool _is_first_time_spill = true;
DECLARE_ONCE_DETECTOR(_set_finishing_once);
Expand Down

0 comments on commit 7a7edc7

Please sign in to comment.