diff --git a/be/src/exec/join_hash_map.cpp b/be/src/exec/join_hash_map.cpp index aee455caed969..ee83d7e6cfad9 100644 --- a/be/src/exec/join_hash_map.cpp +++ b/be/src/exec/join_hash_map.cpp @@ -335,7 +335,7 @@ void JoinHashTable::create(const HashTableParam& param) { _probe_state->probe_counter = param.probe_counter; } - _table_items->build_chunk = std::make_shared(param.build_chunk_segment_size); + _table_items->build_chunk = std::make_shared(); _table_items->with_other_conjunct = param.with_other_conjunct; _table_items->join_type = param.join_type; _table_items->mor_reader_mode = param.mor_reader_mode; @@ -486,7 +486,6 @@ void JoinHashTable::_init_build_column(const HashTableParam& param) { _table_items->build_column_count++; } } - _table_items->build_chunk->build_columns(); } void JoinHashTable::_init_mor_reader() { @@ -512,9 +511,13 @@ void JoinHashTable::_init_mor_reader() { void JoinHashTable::_init_join_keys() { for (const auto& key_desc : _table_items->join_keys) { - auto key_column = ColumnHelper::create_column(*key_desc.type, false); - key_column->append_default(); - _table_items->key_columns.emplace_back(key_column); + if (key_desc.col_ref) { + _table_items->key_columns.emplace_back(nullptr); + } else { + auto key_column = ColumnHelper::create_column(*key_desc.type, false); + key_column->append_default(); + _table_items->key_columns.emplace_back(key_column); + } } } @@ -542,12 +545,12 @@ Status JoinHashTable::build(RuntimeState* state) { RETURN_IF_ERROR(_table_items->build_chunk->upgrade_if_overflow()); _table_items->has_large_column = _table_items->build_chunk->has_large_column(); - // build key_columns + // If the join key is column ref of build chunk, fetch from build chunk directly size_t join_key_count = _table_items->join_keys.size(); for (size_t i = 0; i < join_key_count; i++) { if (_table_items->join_keys[i].col_ref != nullptr) { SlotId slot_id = _table_items->join_keys[i].col_ref->slot_id(); - _table_items->key_columns[i] = _table_items->build_chunk->get_column_by_slot_id(slot_id)->materialize(); + _table_items->key_columns[i] = _table_items->build_chunk->get_column_by_slot_id(slot_id); } } @@ -623,15 +626,19 @@ Status JoinHashTable::probe_remain(RuntimeState* state, ChunkPtr* chunk, bool* e } void JoinHashTable::append_chunk(const ChunkPtr& chunk, const Columns& key_columns) { - // TODO: simplify the SlotId mapping, if the slot of input chunk is same as build_chunk, we don't need to remap them - std::vector slots; + Columns& columns = _table_items->build_chunk->columns(); + for (size_t i = 0; i < _table_items->build_column_count; i++) { SlotDescriptor* slot = _table_items->build_slots[i].slot; - slots.push_back(slot->id()); + ColumnPtr& column = chunk->get_column_by_slot_id(slot->id()); + + if (!columns[i]->is_nullable() && column->is_nullable()) { + // upgrade to nullable column + columns[i] = NullableColumn::create(columns[i], NullColumn::create(columns[i]->size(), 0)); + } + columns[i]->append(*column); } - _table_items->build_chunk->append_chunk(chunk, slots); - // TODO: it's useless for the optimizer, but there're stil some UT depending on it for (size_t i = 0; i < _table_items->key_columns.size(); i++) { // If the join key is slot ref, will get from build chunk directly, // otherwise will append from key_column of input @@ -647,22 +654,21 @@ void JoinHashTable::append_chunk(const ChunkPtr& chunk, const Columns& key_colum } _table_items->row_count += chunk->num_rows(); - DCHECK_EQ(_table_items->row_count + 1, _table_items->build_chunk->num_rows()); } void JoinHashTable::merge_ht(const JoinHashTable& ht) { _table_items->row_count += ht._table_items->row_count; - auto& columns = _table_items->build_chunk->columns(); - auto& other_columns = ht._table_items->build_chunk->columns(); + Columns& columns = _table_items->build_chunk->columns(); + Columns& other_columns = ht._table_items->build_chunk->columns(); for (size_t i = 0; i < _table_items->build_column_count; i++) { if (!columns[i]->is_nullable() && other_columns[i]->is_nullable()) { // upgrade to nullable column - columns[i]->upgrade_to_nullable(); + columns[i] = NullableColumn::create(columns[i], NullColumn::create(columns[i]->size(), 0)); } + columns[i]->append(*other_columns[i], 1, other_columns[i]->size() - 1); } - _table_items->build_chunk->append(ht._table_items->build_chunk, 1); } ChunkPtr JoinHashTable::convert_to_spill_schema(const ChunkPtr& chunk) const { diff --git a/be/src/exec/join_hash_map.h b/be/src/exec/join_hash_map.h index 016bda48aa3ba..a499a5fb9cfda 100644 --- a/be/src/exec/join_hash_map.h +++ b/be/src/exec/join_hash_map.h @@ -14,7 +14,6 @@ #pragma once -#include "storage/chunk_helper.h" #include "util/runtime_profile.h" #define JOIN_HASH_MAP_H @@ -100,7 +99,8 @@ struct HashTableSlotDescriptor { }; struct JoinHashTableItems { - SegmentedChunkPtr build_chunk = nullptr; + //TODO: memory continues problem? + ChunkPtr build_chunk = nullptr; Columns key_columns; std::vector build_slots; std::vector probe_slots; @@ -294,9 +294,6 @@ struct HashTableParam { RuntimeProfile::Counter* output_probe_column_timer = nullptr; RuntimeProfile::Counter* probe_counter = nullptr; bool mor_reader_mode = false; - - // TODO: optimize this according to chunk width - size_t build_chunk_segment_size = 1 << 20; }; template @@ -691,10 +688,9 @@ class JoinHashMap { void _copy_probe_nullable_column(ColumnPtr* src_column, ChunkPtr* chunk, const SlotDescriptor* slot); - void _copy_build_column(const SegmentedColumnPtr& src_column, ChunkPtr* chunk, const SlotDescriptor* slot, - bool to_nullable); + void _copy_build_column(const ColumnPtr& src_column, ChunkPtr* chunk, const SlotDescriptor* slot, bool to_nullable); - void _copy_build_nullable_column(const SegmentedColumnPtr& src_column, ChunkPtr* chunk, const SlotDescriptor* slot); + void _copy_build_nullable_column(const ColumnPtr& src_column, ChunkPtr* chunk, const SlotDescriptor* slot); void _probe_index_output(ChunkPtr* chunk); void _build_index_output(ChunkPtr* chunk); @@ -842,7 +838,7 @@ class JoinHashTable { // convert input column to spill schema order ChunkPtr convert_to_spill_schema(const ChunkPtr& chunk) const; - const SegmentedChunkPtr& get_build_chunk() const { return _table_items->build_chunk; } + const ChunkPtr& get_build_chunk() const { return _table_items->build_chunk; } Columns& get_key_columns() { return _table_items->key_columns; } const Columns& get_key_columns() const { return _table_items->key_columns; } uint32_t get_row_count() const { return _table_items->row_count; } diff --git a/be/src/exec/join_hash_map.tpp b/be/src/exec/join_hash_map.tpp index 0beb7610f41a3..390945e61867e 100644 --- a/be/src/exec/join_hash_map.tpp +++ b/be/src/exec/join_hash_map.tpp @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "column/vectorized_fwd.h" #include "simd/simd.h" #include "util/runtime_profile.h" @@ -59,7 +58,6 @@ void JoinBuildFunc::construct_hash_table(RuntimeState* state, JoinHashTableI if (table_items->key_columns[0]->is_nullable()) { auto* nullable_column = ColumnHelper::as_raw_column(table_items->key_columns[0]); auto& null_array = nullable_column->null_column()->get_data(); - DCHECK_EQ(data.size(), table_items->row_count + 1); for (size_t i = 1; i < table_items->row_count + 1; i++) { if (null_array[i] == 0) { uint32_t bucket_num = JoinHashMapHelper::calc_bucket_num(data[i], table_items->bucket_size); @@ -68,7 +66,6 @@ void JoinBuildFunc::construct_hash_table(RuntimeState* state, JoinHashTableI } } } else { - DCHECK_EQ(data.size(), table_items->row_count + 1); for (size_t i = 1; i < table_items->row_count + 1; i++) { uint32_t bucket_num = JoinHashMapHelper::calc_bucket_num(data[i], table_items->bucket_size); table_items->next[i] = table_items->first[bucket_num]; @@ -622,7 +619,7 @@ void JoinHashMap::_build_output(ChunkPtr* chunk) { bool need_output = is_lazy ? hash_table_slot.need_lazy_materialize : hash_table_slot.need_output; if (need_output) { - auto& column = _table_items->build_chunk->columns()[i]; + ColumnPtr& column = _table_items->build_chunk->columns()[i]; if (!column->is_nullable()) { _copy_build_column(column, chunk, slot, to_nullable); } else { @@ -687,10 +684,11 @@ void JoinHashMap::_copy_probe_nullable_column(ColumnPt } template -void JoinHashMap::_copy_build_column(const SegmentedColumnPtr& src_column, ChunkPtr* chunk, +void JoinHashMap::_copy_build_column(const ColumnPtr& src_column, ChunkPtr* chunk, const SlotDescriptor* slot, bool to_nullable) { if (to_nullable) { - auto data_column = src_column->clone_selective(_probe_state->build_index.data(), 0, _probe_state->count); + auto data_column = src_column->clone_empty(); + data_column->append_selective(*src_column, _probe_state->build_index.data(), 0, _probe_state->count); // When left outer join is executed, // build_index[i] Equal to 0 means it is not found in the hash table, @@ -706,15 +704,18 @@ void JoinHashMap::_copy_build_column(const SegmentedCo auto dest_column = NullableColumn::create(std::move(data_column), null_column); (*chunk)->append_column(std::move(dest_column), slot->id()); } else { - auto data_column = src_column->clone_selective(_probe_state->build_index.data(), 0, _probe_state->count); - (*chunk)->append_column(std::move(data_column), slot->id()); + auto dest_column = src_column->clone_empty(); + dest_column->append_selective(*src_column, _probe_state->build_index.data(), 0, _probe_state->count); + (*chunk)->append_column(std::move(dest_column), slot->id()); } } template -void JoinHashMap::_copy_build_nullable_column(const SegmentedColumnPtr& src_column, - ChunkPtr* chunk, const SlotDescriptor* slot) { - ColumnPtr dest_column = src_column->clone_selective(_probe_state->build_index.data(), 0, _probe_state->count); +void JoinHashMap::_copy_build_nullable_column(const ColumnPtr& src_column, ChunkPtr* chunk, + const SlotDescriptor* slot) { + ColumnPtr dest_column = src_column->clone_empty(); + + dest_column->append_selective(*src_column, _probe_state->build_index.data(), 0, _probe_state->count); // When left outer join is executed, // build_index[i] Equal to 0 means it is not found in the hash table, diff --git a/be/src/exec/pipeline/hashjoin/spillable_hash_join_build_operator.h b/be/src/exec/pipeline/hashjoin/spillable_hash_join_build_operator.h index 77c253d1257f6..723f8232e39d7 100644 --- a/be/src/exec/pipeline/hashjoin/spillable_hash_join_build_operator.h +++ b/be/src/exec/pipeline/hashjoin/spillable_hash_join_build_operator.h @@ -66,7 +66,7 @@ class SpillableHashJoinBuildOperator final : public HashJoinBuildOperator { size_t _hash_table_iterate_idx = 0; std::vector _hash_tables; - SegmentedChunkSlice _hash_table_build_chunk_slice; + ChunkSharedSlice _hash_table_build_chunk_slice; std::function()> _hash_table_slice_iterator; bool _is_first_time_spill = true; DECLARE_ONCE_DETECTOR(_set_finishing_once);