diff --git a/be/src/bench/shuffle_chunk_bench.cpp b/be/src/bench/shuffle_chunk_bench.cpp index d27019b979cf1..4a0d13769c026 100644 --- a/be/src/bench/shuffle_chunk_bench.cpp +++ b/be/src/bench/shuffle_chunk_bench.cpp @@ -16,14 +16,14 @@ #include #include +#include #include "column/chunk.h" #include "column/column_helper.h" -#include "column/datum_tuple.h" -#include "common/config.h" -#include "runtime/chunk_cursor.h" -#include "runtime/runtime_state.h" +#include "column/vectorized_fwd.h" #include "runtime/types.h" +#include "storage/chunk_helper.h" +#include "types/logical_type.h" namespace starrocks { @@ -196,6 +196,181 @@ static void bench_func(benchmark::State& state) { perf.do_bench(state); } +// Benchmark SegmentedColumn::clone_selective && Chunk::append_selective function +class SegmentedChunkPerf { +public: + SegmentedChunkPerf() = default; + + void prepare_bench_segmented_chunk_clone(benchmark::State& state) { + // std::cerr << "chunk_size: " << _dest_chunk_size << std::endl; + // std::cerr << "segment_size: " << _segment_size << std::endl; + // std::cerr << "segmented_chunk_size: " << _segment_chunk_size << std::endl; + SegmentedChunkPtr seg_chunk = prepare_chunk(); + CHECK_EQ(seg_chunk->num_rows(), _segment_chunk_size); + + // random select + random_select(select, _dest_chunk_size, seg_chunk->num_rows()); + } + + void prepare_bench_chunk_clone(benchmark::State& state) { + ChunkPtr chunk = build_chunk(_segment_size); + CHECK_EQ(chunk->num_rows(), _segment_size); + random_select(select, _dest_chunk_size, chunk->num_rows()); + } + + void prepare(benchmark::State& state) { + state.PauseTiming(); + + _column_count = state.range(0); + _data_type = state.range(1); + _num_segments = state.range(2); + _types.clear(); + + prepare_bench_chunk_clone(state); + prepare_bench_segmented_chunk_clone(state); + + state.ResumeTiming(); + } + + void do_bench_segmented_chunk_clone(benchmark::State& state) { + SegmentedChunkPtr seg_chunk = prepare_chunk(); + // clone_selective + size_t items = 0; + for (auto _ : state) { + for (auto& column : seg_chunk->columns()) { + auto cloned = column->clone_selective(select.data(), 0, select.size()); + } + items += select.size(); + } + state.SetItemsProcessed(items); + } + + void do_bench_chunk_clone(benchmark::State& state) { + ChunkPtr chunk = prepare_big_chunk(); + size_t items = 0; + for (auto _ : state) { + ChunkPtr empty = chunk->clone_empty(); + empty->append_selective(*chunk, select.data(), 0, select.size()); + items += select.size(); + } + state.SetItemsProcessed(items); + } + + ChunkPtr prepare_big_chunk() { + if (_big_chunk) { + return _big_chunk; + } + _big_chunk = build_chunk(_segment_chunk_size); + return _big_chunk; + } + + SegmentedChunkPtr prepare_chunk() { + if (_seg_chunk) { + return _seg_chunk; + } + ChunkPtr chunk = build_chunk(_dest_chunk_size); + + for (int i = 0; i < (_segment_chunk_size / _dest_chunk_size); i++) { + if (!_seg_chunk) { + _seg_chunk = SegmentedChunk::create(_segment_size); + ChunkPtr chunk = build_chunk(_dest_chunk_size); + auto map = chunk->get_slot_id_to_index_map(); + for (auto entry : map) { + _seg_chunk->append_column(chunk->get_column_by_slot_id(entry.first), entry.first); + } + _seg_chunk->build_columns(); + } else { + // std::cerr << " append " << chunk->num_rows() << "rows, become " << _seg_chunk->num_rows() << std::endl; + _seg_chunk->append_chunk(chunk); + } + } + return _seg_chunk; + } + + void random_select(std::vector& select, size_t count, size_t range) { + select.resize(count); + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> dis(0, range - 1); + std::generate(select.begin(), select.end(), [&]() { return dis(gen); }); + } + + ChunkPtr build_chunk(size_t chunk_size) { + if (_types.empty()) { + for (int i = 0; i < _column_count; i++) { + if (_data_type == 0) { + _types.emplace_back(TypeDescriptor::create_varchar_type(128)); + } else if (_data_type == 1) { + _types.emplace_back(LogicalType::TYPE_INT); + } else { + CHECK(false) << "data type not supported: " << _data_type; + } + } + } + + auto chunk = std::make_unique(); + for (int i = 0; i < _column_count; i++) { + auto col = init_dest_column(_types[i], chunk_size); + chunk->append_column(col, i); + } + return chunk; + } + + ColumnPtr init_dest_column(const TypeDescriptor& type, size_t chunk_size) { + auto c1 = ColumnHelper::create_column(type, true); + c1->reserve(chunk_size); + for (int i = 0; i < chunk_size; i++) { + if (type.is_string_type()) { + std::string str = fmt::format("str{}", i); + c1->append_datum(Slice(str)); + } else if (type.is_integer_type()) { + c1->append_datum(i); + } else { + CHECK(false) << "data type not supported"; + } + } + return c1; + } + +private: + int _column_count = 4; + int _data_type = 0; + size_t _dest_chunk_size = 4096; + size_t _segment_size = 65536; + size_t _num_segments = 10; + size_t _segment_chunk_size = _segment_size * _num_segments; + + SegmentedChunkPtr _seg_chunk; + ChunkPtr _big_chunk; + std::vector select; + std::vector _types; +}; + +static void BenchSegmentedChunkClone(benchmark::State& state) { + google::InstallFailureSignalHandler(); + auto perf = std::make_unique(); + perf->prepare(state); + perf->do_bench_segmented_chunk_clone(state); +} + +static void BenchChunkClone(benchmark::State& state) { + google::InstallFailureSignalHandler(); + auto perf = std::make_unique(); + perf->prepare(state); + perf->do_bench_chunk_clone(state); +} + +static std::vector> chunk_clone_args() { + return { + {1, 2, 3, 4}, // num columns + {0, 1}, // data type + {1, 4, 16, 64} // num_segments + }; +} + +BENCHMARK(BenchSegmentedChunkClone)->ArgsProduct(chunk_clone_args()); +BENCHMARK(BenchChunkClone)->ArgsProduct(chunk_clone_args()); + static void process_args(benchmark::internal::Benchmark* b) { // chunk_count, column_count, node_count, src_chunk_size, null percent b->Args({400, 400, 140, 4096, 80}); diff --git a/be/src/column/binary_column.cpp b/be/src/column/binary_column.cpp index e209e548defa0..3861ccfd8f979 100644 --- a/be/src/column/binary_column.cpp +++ b/be/src/column/binary_column.cpp @@ -45,8 +45,16 @@ void BinaryColumnBase::check_or_die() const { } } +template +void BinaryColumnBase::append(const Slice& str) { + _bytes.insert(_bytes.end(), str.data, str.data + str.size); + _offsets.emplace_back(_bytes.size()); + _slices_cache = false; +} + template void BinaryColumnBase::append(const Column& src, size_t offset, size_t count) { + DCHECK(offset + count <= src.size()); const auto& b = down_cast&>(src); const unsigned char* p = &b._bytes[b._offsets[offset]]; const unsigned char* e = &b._bytes[b._offsets[offset + count]]; diff --git a/be/src/column/binary_column.h b/be/src/column/binary_column.h index 09bfac117a058..183622bb21dfc 100644 --- a/be/src/column/binary_column.h +++ b/be/src/column/binary_column.h @@ -32,8 +32,13 @@ class BinaryColumnBase final : public ColumnFactory> using Offset = T; using Offsets = Buffer; +<<<<<<< HEAD using Bytes = starrocks::raw::RawVectorPad16; +======= + using Byte = uint8_t; + using Bytes = starrocks::raw::RawVectorPad16>; +>>>>>>> 5dd0cc5154 ([Enhancement] split chunk of HashTable (#51175)) struct BinaryDataProxyContainer { BinaryDataProxyContainer(const BinaryColumnBase& column) : _column(column) {} @@ -172,11 +177,7 @@ class BinaryColumnBase final : public ColumnFactory> // No complain about the overloaded-virtual for this function DIAGNOSTIC_PUSH DIAGNOSTIC_IGNORE("-Woverloaded-virtual") - void append(const Slice& str) { - _bytes.insert(_bytes.end(), str.data, str.data + str.size); - _offsets.emplace_back(_bytes.size()); - _slices_cache = false; - } + void append(const Slice& str); DIAGNOSTIC_POP void append_datum(const Datum& datum) override { diff --git a/be/src/column/column_helper.cpp b/be/src/column/column_helper.cpp index 2cb9675f9288e..136a89e614061 100644 --- a/be/src/column/column_helper.cpp +++ b/be/src/column/column_helper.cpp @@ -25,6 +25,7 @@ #include "column/vectorized_fwd.h" #include "gutil/casts.h" #include "simd/simd.h" +#include "storage/chunk_helper.h" #include "types/logical_type_infra.h" #include "util/date_func.h" #include "util/percentile_value.h" @@ -469,7 +470,7 @@ size_t ChunkSliceTemplate::skip(size_t skip_rows) { // Cutoff required rows from this chunk template -Ptr ChunkSliceTemplate::cutoff(size_t required_rows) { +ChunkUniquePtr ChunkSliceTemplate::cutoff(size_t required_rows) { DCHECK(!empty()); size_t cut_rows = std::min(rows(), required_rows); auto res = chunk->clone_empty(cut_rows); @@ -482,7 +483,31 @@ Ptr ChunkSliceTemplate::cutoff(size_t required_rows) { return res; } +// Specialized for SegmentedChunkPtr +template <> +ChunkUniquePtr ChunkSliceTemplate::cutoff(size_t required_rows) { + DCHECK(!empty()); + // cutoff a chunk from current segment, if it doesn't meet the requirement just let it be + ChunkPtr segment = chunk->segments()[segment_id]; + size_t segment_offset = offset % chunk->segment_size(); + size_t cut_rows = std::min(segment->num_rows() - segment_offset, required_rows); + + auto res = segment->clone_empty(cut_rows); + res->append(*segment, segment_offset, cut_rows); + offset += cut_rows; + + // move to next segment + segment_id = offset / chunk->segment_size(); + + if (empty()) { + chunk->reset(); + offset = 0; + } + return res; +} + template struct ChunkSliceTemplate; template struct ChunkSliceTemplate; +template struct ChunkSliceTemplate; } // namespace starrocks diff --git a/be/src/column/column_helper.h b/be/src/column/column_helper.h index a9c7dfb6e8242..4efe000bdcb4e 100644 --- a/be/src/column/column_helper.h +++ b/be/src/column/column_helper.h @@ -540,12 +540,13 @@ class ColumnHelper { template struct ChunkSliceTemplate { Ptr chunk; + size_t segment_id = 0; size_t offset = 0; bool empty() const; size_t rows() const; size_t skip(size_t skip_rows); - Ptr cutoff(size_t required_rows); + ChunkUniquePtr cutoff(size_t required_rows); void reset(Ptr input); }; @@ -575,5 +576,6 @@ APPLY_FOR_ALL_STRING_TYPE(GET_CONTAINER) using ChunkSlice = ChunkSliceTemplate; using ChunkSharedSlice = ChunkSliceTemplate; +using SegmentedChunkSlice = ChunkSliceTemplate; } // namespace starrocks diff --git a/be/src/column/const_column.h b/be/src/column/const_column.h index fcababccb3d46..46f0cd35413cd 100644 --- a/be/src/column/const_column.h +++ b/be/src/column/const_column.h @@ -25,6 +25,8 @@ class ConstColumn final : public ColumnFactory { friend class ColumnFactory; public: + using ValueType = void; + explicit ConstColumn(ColumnPtr data_column); ConstColumn(ColumnPtr data_column, size_t size); diff --git a/be/src/column/nullable_column.h b/be/src/column/nullable_column.h index 03d6295d93b46..9d7774a140730 100644 --- a/be/src/column/nullable_column.h +++ b/be/src/column/nullable_column.h @@ -33,6 +33,8 @@ class NullableColumn : public ColumnFactory { friend class ColumnFactory; public: + using ValueType = void; + inline static ColumnPtr wrap_if_necessary(ColumnPtr column) { if (column->is_nullable()) { return column; diff --git a/be/src/column/vectorized_fwd.h b/be/src/column/vectorized_fwd.h index 7063a159d0a41..28b5e5c3f50cf 100644 --- a/be/src/column/vectorized_fwd.h +++ b/be/src/column/vectorized_fwd.h @@ -107,6 +107,12 @@ using ChunkPtr = std::shared_ptr; using ChunkUniquePtr = std::unique_ptr; using Chunks = std::vector; +class SegmentedColumn; +class SegmentedChunk; +using SegmentedColumnPtr = std::shared_ptr; +using SegmentedColumns = std::vector; +using SegmentedChunkPtr = std::shared_ptr; + using SchemaPtr = std::shared_ptr; using Fields = std::vector>; diff --git a/be/src/exec/join_hash_map.cpp b/be/src/exec/join_hash_map.cpp index db1752b31a1a7..f347234c42a71 100644 --- a/be/src/exec/join_hash_map.cpp +++ b/be/src/exec/join_hash_map.cpp @@ -331,7 +331,7 @@ void JoinHashTable::create(const HashTableParam& param) { _probe_state->output_build_column_timer = param.output_build_column_timer; } - _table_items->build_chunk = std::make_shared(); + _table_items->build_chunk = std::make_shared(param.build_chunk_segment_size); _table_items->with_other_conjunct = param.with_other_conjunct; _table_items->join_type = param.join_type; _table_items->mor_reader_mode = param.mor_reader_mode; @@ -482,6 +482,7 @@ void JoinHashTable::_init_build_column(const HashTableParam& param) { _table_items->build_column_count++; } } + _table_items->build_chunk->build_columns(); } void JoinHashTable::_init_mor_reader() { @@ -507,13 +508,9 @@ void JoinHashTable::_init_mor_reader() { void JoinHashTable::_init_join_keys() { for (const auto& key_desc : _table_items->join_keys) { - if (key_desc.col_ref) { - _table_items->key_columns.emplace_back(nullptr); - } else { - auto key_column = ColumnHelper::create_column(*key_desc.type, false); - key_column->append_default(); - _table_items->key_columns.emplace_back(key_column); - } + auto key_column = ColumnHelper::create_column(*key_desc.type, false); + key_column->append_default(); + _table_items->key_columns.emplace_back(key_column); } } @@ -541,12 +538,12 @@ Status JoinHashTable::build(RuntimeState* state) { RETURN_IF_ERROR(_table_items->build_chunk->upgrade_if_overflow()); _table_items->has_large_column = _table_items->build_chunk->has_large_column(); - // If the join key is column ref of build chunk, fetch from build chunk directly + // build key_columns size_t join_key_count = _table_items->join_keys.size(); for (size_t i = 0; i < join_key_count; i++) { if (_table_items->join_keys[i].col_ref != nullptr) { SlotId slot_id = _table_items->join_keys[i].col_ref->slot_id(); - _table_items->key_columns[i] = _table_items->build_chunk->get_column_by_slot_id(slot_id); + _table_items->key_columns[i] = _table_items->build_chunk->get_column_by_slot_id(slot_id)->materialize(); } } @@ -622,19 +619,15 @@ Status JoinHashTable::probe_remain(RuntimeState* state, ChunkPtr* chunk, bool* e } void JoinHashTable::append_chunk(const ChunkPtr& chunk, const Columns& key_columns) { - Columns& columns = _table_items->build_chunk->columns(); - + // TODO: simplify the SlotId mapping, if the slot of input chunk is same as build_chunk, we don't need to remap them + std::vector slots; for (size_t i = 0; i < _table_items->build_column_count; i++) { SlotDescriptor* slot = _table_items->build_slots[i].slot; - ColumnPtr& column = chunk->get_column_by_slot_id(slot->id()); - - if (!columns[i]->is_nullable() && column->is_nullable()) { - // upgrade to nullable column - columns[i] = NullableColumn::create(columns[i], NullColumn::create(columns[i]->size(), 0)); - } - columns[i]->append(*column); + slots.push_back(slot->id()); } + _table_items->build_chunk->append_chunk(chunk, slots); + // TODO: it's useless for the optimizer, but there're stil some UT depending on it for (size_t i = 0; i < _table_items->key_columns.size(); i++) { // If the join key is slot ref, will get from build chunk directly, // otherwise will append from key_column of input @@ -650,9 +643,30 @@ void JoinHashTable::append_chunk(const ChunkPtr& chunk, const Columns& key_colum } _table_items->row_count += chunk->num_rows(); + DCHECK_EQ(_table_items->row_count + 1, _table_items->build_chunk->num_rows()); } +<<<<<<< HEAD StatusOr JoinHashTable::convert_to_spill_schema(const ChunkPtr& chunk) const { +======= +void JoinHashTable::merge_ht(const JoinHashTable& ht) { + _table_items->row_count += ht._table_items->row_count; + + auto& columns = _table_items->build_chunk->columns(); + auto& other_columns = ht._table_items->build_chunk->columns(); + + for (size_t i = 0; i < _table_items->build_column_count; i++) { + if (!columns[i]->is_nullable() && other_columns[i]->is_nullable()) { + // upgrade to nullable column + columns[i]->upgrade_to_nullable(); + } + } + _table_items->build_chunk->append(ht._table_items->build_chunk, 1); +} + +ChunkPtr JoinHashTable::convert_to_spill_schema(const ChunkPtr& chunk) const { + DCHECK(chunk != nullptr && chunk->num_rows() > 0); +>>>>>>> 5dd0cc5154 ([Enhancement] split chunk of HashTable (#51175)) ChunkPtr output = std::make_shared(); // for (size_t i = 0; i < _table_items->build_column_count; i++) { diff --git a/be/src/exec/join_hash_map.h b/be/src/exec/join_hash_map.h index 3efb212174450..f6ed17e43515c 100644 --- a/be/src/exec/join_hash_map.h +++ b/be/src/exec/join_hash_map.h @@ -14,6 +14,11 @@ #pragma once +<<<<<<< HEAD +======= +#include "storage/chunk_helper.h" +#include "util/runtime_profile.h" +>>>>>>> 5dd0cc5154 ([Enhancement] split chunk of HashTable (#51175)) #define JOIN_HASH_MAP_H #include @@ -98,8 +103,7 @@ struct HashTableSlotDescriptor { }; struct JoinHashTableItems { - //TODO: memory continues problem? - ChunkPtr build_chunk = nullptr; + SegmentedChunkPtr build_chunk = nullptr; Columns key_columns; Buffer build_slots; Buffer probe_slots; @@ -289,6 +293,9 @@ struct HashTableParam { RuntimeProfile::Counter* output_build_column_timer = nullptr; RuntimeProfile::Counter* output_probe_column_timer = nullptr; bool mor_reader_mode = false; + + // TODO: optimize this according to chunk width + size_t build_chunk_segment_size = 1 << 16; }; template @@ -683,9 +690,10 @@ class JoinHashMap { void _copy_probe_nullable_column(ColumnPtr* src_column, ChunkPtr* chunk, const SlotDescriptor* slot); - void _copy_build_column(const ColumnPtr& src_column, ChunkPtr* chunk, const SlotDescriptor* slot, bool to_nullable); + void _copy_build_column(const SegmentedColumnPtr& src_column, ChunkPtr* chunk, const SlotDescriptor* slot, + bool to_nullable); - void _copy_build_nullable_column(const ColumnPtr& src_column, ChunkPtr* chunk, const SlotDescriptor* slot); + void _copy_build_nullable_column(const SegmentedColumnPtr& src_column, ChunkPtr* chunk, const SlotDescriptor* slot); void _probe_index_output(ChunkPtr* chunk); void _build_index_output(ChunkPtr* chunk); @@ -832,7 +840,7 @@ class JoinHashTable { // convert input column to spill schema order StatusOr convert_to_spill_schema(const ChunkPtr& chunk) const; - const ChunkPtr& get_build_chunk() const { return _table_items->build_chunk; } + const SegmentedChunkPtr& get_build_chunk() const { return _table_items->build_chunk; } Columns& get_key_columns() { return _table_items->key_columns; } uint32_t get_row_count() const { return _table_items->row_count; } size_t get_probe_column_count() const { return _table_items->probe_column_count; } diff --git a/be/src/exec/join_hash_map.tpp b/be/src/exec/join_hash_map.tpp index 0e32f51aa4354..53559445c915a 100644 --- a/be/src/exec/join_hash_map.tpp +++ b/be/src/exec/join_hash_map.tpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include "column/vectorized_fwd.h" #include "simd/simd.h" #define JOIN_HASH_MAP_TPP @@ -57,6 +58,7 @@ void JoinBuildFunc::construct_hash_table(RuntimeState* state, JoinHashTableI if (table_items->key_columns[0]->is_nullable()) { auto* nullable_column = ColumnHelper::as_raw_column(table_items->key_columns[0]); auto& null_array = nullable_column->null_column()->get_data(); + DCHECK_EQ(data.size(), table_items->row_count + 1); for (size_t i = 1; i < table_items->row_count + 1; i++) { if (null_array[i] == 0) { uint32_t bucket_num = JoinHashMapHelper::calc_bucket_num(data[i], table_items->bucket_size); @@ -65,6 +67,7 @@ void JoinBuildFunc::construct_hash_table(RuntimeState* state, JoinHashTableI } } } else { + DCHECK_EQ(data.size(), table_items->row_count + 1); for (size_t i = 1; i < table_items->row_count + 1; i++) { uint32_t bucket_num = JoinHashMapHelper::calc_bucket_num(data[i], table_items->bucket_size); table_items->next[i] = table_items->first[bucket_num]; @@ -618,7 +621,7 @@ void JoinHashMap::_build_output(ChunkPtr* chunk) { bool need_output = is_lazy ? hash_table_slot.need_lazy_materialize : hash_table_slot.need_output; if (need_output) { - ColumnPtr& column = _table_items->build_chunk->columns()[i]; + auto& column = _table_items->build_chunk->columns()[i]; if (!column->is_nullable()) { _copy_build_column(column, chunk, slot, to_nullable); } else { @@ -683,11 +686,10 @@ void JoinHashMap::_copy_probe_nullable_column(ColumnPt } template -void JoinHashMap::_copy_build_column(const ColumnPtr& src_column, ChunkPtr* chunk, +void JoinHashMap::_copy_build_column(const SegmentedColumnPtr& src_column, ChunkPtr* chunk, const SlotDescriptor* slot, bool to_nullable) { if (to_nullable) { - auto data_column = src_column->clone_empty(); - data_column->append_selective(*src_column, _probe_state->build_index.data(), 0, _probe_state->count); + auto data_column = src_column->clone_selective(_probe_state->build_index.data(), 0, _probe_state->count); // When left outer join is executed, // build_index[i] Equal to 0 means it is not found in the hash table, @@ -703,18 +705,15 @@ void JoinHashMap::_copy_build_column(const ColumnPtr& auto dest_column = NullableColumn::create(std::move(data_column), null_column); (*chunk)->append_column(std::move(dest_column), slot->id()); } else { - auto dest_column = src_column->clone_empty(); - dest_column->append_selective(*src_column, _probe_state->build_index.data(), 0, _probe_state->count); - (*chunk)->append_column(std::move(dest_column), slot->id()); + auto data_column = src_column->clone_selective(_probe_state->build_index.data(), 0, _probe_state->count); + (*chunk)->append_column(std::move(data_column), slot->id()); } } template -void JoinHashMap::_copy_build_nullable_column(const ColumnPtr& src_column, ChunkPtr* chunk, - const SlotDescriptor* slot) { - ColumnPtr dest_column = src_column->clone_empty(); - - dest_column->append_selective(*src_column, _probe_state->build_index.data(), 0, _probe_state->count); +void JoinHashMap::_copy_build_nullable_column(const SegmentedColumnPtr& src_column, + ChunkPtr* chunk, const SlotDescriptor* slot) { + ColumnPtr dest_column = src_column->clone_selective(_probe_state->build_index.data(), 0, _probe_state->count); // When left outer join is executed, // build_index[i] Equal to 0 means it is not found in the hash table, diff --git a/be/src/exec/pipeline/hashjoin/spillable_hash_join_build_operator.cpp b/be/src/exec/pipeline/hashjoin/spillable_hash_join_build_operator.cpp index 49e31c596b44d..6e6461b448551 100644 --- a/be/src/exec/pipeline/hashjoin/spillable_hash_join_build_operator.cpp +++ b/be/src/exec/pipeline/hashjoin/spillable_hash_join_build_operator.cpp @@ -238,7 +238,12 @@ std::function()> SpillableHashJoinBuildOperator::_convert_has _join_builder->hash_join_builder()->reset(_join_builder->hash_table_param()); return Status::EndOfFile("eos"); } +<<<<<<< HEAD auto chunk = _hash_table_build_chunk_slice.cutoff(runtime_state()->chunk_size()); +======= + + ChunkPtr chunk = _hash_table_build_chunk_slice.cutoff(runtime_state()->chunk_size()); +>>>>>>> 5dd0cc5154 ([Enhancement] split chunk of HashTable (#51175)) RETURN_IF_ERROR(chunk->downgrade()); RETURN_IF_ERROR(append_hash_columns(chunk)); _join_builder->update_build_rows(chunk->num_rows()); diff --git a/be/src/exec/pipeline/hashjoin/spillable_hash_join_build_operator.h b/be/src/exec/pipeline/hashjoin/spillable_hash_join_build_operator.h index 59170004d8d9d..9729054d01fa6 100644 --- a/be/src/exec/pipeline/hashjoin/spillable_hash_join_build_operator.h +++ b/be/src/exec/pipeline/hashjoin/spillable_hash_join_build_operator.h @@ -63,7 +63,13 @@ class SpillableHashJoinBuildOperator final : public HashJoinBuildOperator { Status init_spiller_partitions(RuntimeState* state, JoinHashTable& ht); +<<<<<<< HEAD ChunkSharedSlice _hash_table_build_chunk_slice; +======= + size_t _hash_table_iterate_idx = 0; + std::vector _hash_tables; + SegmentedChunkSlice _hash_table_build_chunk_slice; +>>>>>>> 5dd0cc5154 ([Enhancement] split chunk of HashTable (#51175)) std::function()> _hash_table_slice_iterator; bool _is_first_time_spill = true; DECLARE_ONCE_DETECTOR(_set_finishing_once); diff --git a/be/src/exec/spill/mem_table.cpp b/be/src/exec/spill/mem_table.cpp index c89f112d91e7d..98cd79270f4e4 100644 --- a/be/src/exec/spill/mem_table.cpp +++ b/be/src/exec/spill/mem_table.cpp @@ -167,7 +167,7 @@ Status OrderedMemTable::finalize(workgroup::YieldContext& yield_ctx, const Spill return Status::OK(); } SCOPED_RAW_TIMER(&yield_ctx.time_spent_ns); - auto chunk = _chunk_slice.cutoff(_runtime_state->chunk_size()); + ChunkPtr chunk = _chunk_slice.cutoff(_runtime_state->chunk_size()); bool need_aligned = _runtime_state->spill_enable_direct_io(); RETURN_IF_ERROR(serde->serialize(_runtime_state, serde_ctx, chunk, output, need_aligned)); diff --git a/be/src/storage/chunk_helper.cpp b/be/src/storage/chunk_helper.cpp index 52dce6f57794d..5d0d8301c4fec 100644 --- a/be/src/storage/chunk_helper.cpp +++ b/be/src/storage/chunk_helper.cpp @@ -14,6 +14,9 @@ #include "storage/chunk_helper.h" +#include +#include + #include "column/array_column.h" #include "column/chunk.h" #include "column/column_helper.h" @@ -23,6 +26,7 @@ #include "column/schema.h" #include "column/struct_column.h" #include "column/type_traits.h" +#include "column/vectorized_fwd.h" #include "gutil/strings/fastmem.h" #include "runtime/current_thread.h" #include "runtime/descriptors.h" @@ -631,4 +635,402 @@ bool ChunkPipelineAccumulator::is_finished() const { return _finalized && _out_chunk == nullptr && _in_chunk == nullptr; } +template +inline constexpr bool is_object = std::is_same_v || std::is_same_v || + std::is_same_v || std::is_same_v || + std::is_same_v, ColumnT>; + +// Selective-copy data from SegmentedColumn according to provided index +class SegmentedColumnSelectiveCopy final : public ColumnVisitorAdapter { +public: + SegmentedColumnSelectiveCopy(SegmentedColumnPtr segment_column, const uint32_t* indexes, uint32_t from, + uint32_t size) + : ColumnVisitorAdapter(this), + _segment_column(std::move(segment_column)), + _indexes(indexes), + _from(from), + _size(size) {} + + template + Status do_visit(const FixedLengthColumnBase& column) { + using ColumnT = FixedLengthColumnBase; + using ContainerT = typename ColumnT::Container; + + _result = column.clone_empty(); + auto output = ColumnHelper::as_column(_result); + + std::vector buffers; + auto columns = _segment_column->columns(); + for (auto& seg_column : columns) { + buffers.push_back(&ColumnHelper::as_column(seg_column)->get_data()); + } + + ContainerT& output_items = output->get_data(); + output_items.resize(_size); + for (uint32_t i = 0; i < _size; i++) { + uint32_t idx = _indexes[_from + i]; + auto [segment_id, segment_offset] = _segment_address(idx); + DCHECK_LT(segment_id, columns.size()); + DCHECK_LT(segment_offset, columns[segment_id]->size()); + + output_items[i] = (*buffers[segment_id])[segment_offset]; + } + return {}; + } + + // Implementation refers to BinaryColumn::append_selective + template + Status do_visit(const BinaryColumnBase& column) { + using ColumnT = BinaryColumnBase; + using ContainerT = typename ColumnT::Container*; + using Bytes = typename ColumnT::Bytes; + using Byte = typename ColumnT::Byte; + using Offsets = typename ColumnT::Offsets; + + _result = column.clone_empty(); + auto output = ColumnHelper::as_column(_result); + auto& output_offsets = output->get_offset(); + auto& output_bytes = output->get_bytes(); + + // input + auto columns = _segment_column->columns(); + std::vector input_bytes; + std::vector input_offsets; + for (auto& seg_column : columns) { + input_bytes.push_back(&ColumnHelper::as_column(seg_column)->get_bytes()); + input_offsets.push_back(&ColumnHelper::as_column(seg_column)->get_offset()); + } + +#ifndef NDEBUG + for (auto& src_col : columns) { + src_col->check_or_die(); + } +#endif + + // assign offsets + output_offsets.resize(_size + 1); + size_t num_bytes = 0; + for (size_t i = 0; i < _size; i++) { + uint32_t idx = _indexes[_from + i]; + auto [segment_id, segment_offset] = _segment_address(idx); + DCHECK_LT(segment_id, columns.size()); + DCHECK_LT(segment_offset, columns[segment_id]->size()); + + Offsets& src_offsets = *input_offsets[segment_id]; + Offset str_size = src_offsets[segment_offset + 1] - src_offsets[segment_offset]; + + output_offsets[i + 1] = output_offsets[i] + str_size; + num_bytes += str_size; + } + output_bytes.resize(num_bytes); + + // copy bytes + Byte* dest_bytes = output_bytes.data(); + for (size_t i = 0; i < _size; i++) { + uint32_t idx = _indexes[_from + i]; + auto [segment_id, segment_offset] = _segment_address(idx); + Bytes& src_bytes = *input_bytes[segment_id]; + Offsets& src_offsets = *input_offsets[segment_id]; + Offset str_size = src_offsets[segment_offset + 1] - src_offsets[segment_offset]; + Byte* str_data = src_bytes.data() + src_offsets[segment_offset]; + + strings::memcpy_inlined(dest_bytes + output_offsets[i], str_data, str_size); + } + +#ifndef NDEBUG + output->check_or_die(); +#endif + + return {}; + } + + // Inefficient fallback implementation, it's usually used for Array/Struct/Map/Json + template + typename std::enable_if_t, Status> do_visit(const ColumnT& column) { + _result = column.clone_empty(); + auto output = ColumnHelper::as_column(_result); + output->reserve(_size); + + auto columns = _segment_column->columns(); + for (uint32_t i = 0; i < _size; i++) { + uint32_t idx = _indexes[_from + i]; + auto [segment_id, segment_offset] = _segment_address(idx); + output->append(*columns[segment_id], segment_offset, 1); + } + return {}; + } + + Status do_visit(const NullableColumn& column) { + std::vector data_columns, null_columns; + for (auto& column : _segment_column->columns()) { + NullableColumn::Ptr nullable = ColumnHelper::as_column(column); + data_columns.push_back(nullable->data_column()); + null_columns.push_back(nullable->null_column()); + } + + auto segmented_data_column = std::make_shared(data_columns, _segment_column->segment_size()); + SegmentedColumnSelectiveCopy copy_data(segmented_data_column, _indexes, _from, _size); + (void)data_columns[0]->accept(©_data); + auto segmented_null_column = std::make_shared(null_columns, _segment_column->segment_size()); + SegmentedColumnSelectiveCopy copy_null(segmented_null_column, _indexes, _from, _size); + (void)null_columns[0]->accept(©_null); + _result = NullableColumn::create(copy_data.result(), ColumnHelper::as_column(copy_null.result())); + + return {}; + } + + Status do_visit(const ConstColumn& column) { return Status::NotSupported("SegmentedColumnVisitor"); } + + ColumnPtr result() { return _result; } + +private: + std::pair _segment_address(uint32 idx) { + size_t segment_size = _segment_column->segment_size(); + int segment_id = idx / segment_size; + int segment_offset = idx % segment_size; + return {segment_id, segment_offset}; + } + + SegmentedColumnPtr _segment_column; + ColumnPtr _result; + const uint32_t* _indexes; + uint32_t _from; + uint32_t _size; +}; + +SegmentedColumn::SegmentedColumn(SegmentedChunkPtr chunk, size_t column_index) + : _chunk(std::move(chunk)), _column_index(column_index), _segment_size(_chunk->segment_size()) {} + +SegmentedColumn::SegmentedColumn(std::vector columns, size_t segment_size) + : _segment_size(segment_size), _cached_columns(std::move(columns)) {} + +ColumnPtr SegmentedColumn::clone_selective(const uint32_t* indexes, uint32_t from, uint32_t size) { + SegmentedColumnSelectiveCopy visitor(shared_from_this(), indexes, from, size); + (void)columns()[0]->accept(&visitor); + return visitor.result(); +} + +ColumnPtr SegmentedColumn::materialize() const { + auto actual_columns = columns(); + if (actual_columns.empty()) { + return {}; + } + ColumnPtr result = actual_columns[0]->clone_empty(); + for (size_t i = 0; i < actual_columns.size(); i++) { + result->append(*actual_columns[i]); + } + return result; +} + +size_t SegmentedColumn::segment_size() const { + return _segment_size; +} + +size_t SegmentedChunk::segment_size() const { + return _segment_size; +} + +bool SegmentedColumn::is_nullable() const { + return columns()[0]->is_nullable(); +} + +bool SegmentedColumn::has_null() const { + for (auto& column : columns()) { + RETURN_IF(column->has_null(), true); + } + return false; +} + +size_t SegmentedColumn::size() const { + size_t result = 0; + for (auto& column : columns()) { + result += column->size(); + } + return result; +} + +std::vector SegmentedColumn::columns() const { + if (!_cached_columns.empty()) { + return _cached_columns; + } + std::vector columns; + for (auto& segment : _chunk->segments()) { + columns.push_back(segment->get_column_by_index(_column_index)); + } + return columns; +} + +void SegmentedColumn::upgrade_to_nullable() { + for (auto& segment : _chunk->segments()) { + auto& column = segment->get_column_by_index(_column_index); + column = NullableColumn::wrap_if_necessary(column); + } +} + +SegmentedChunk::SegmentedChunk(size_t segment_size) : _segment_size(segment_size) { + // put at least one chunk there + _segments.resize(1); + _segments[0] = std::make_shared(); +} + +SegmentedChunkPtr SegmentedChunk::create(size_t segment_size) { + return std::make_shared(segment_size); +} + +void SegmentedChunk::append_column(ColumnPtr column, SlotId slot_id) { + // It's only used when initializing the chunk, so append the column to first chunk is enough + DCHECK_EQ(_segments.size(), 1); + _segments[0]->append_column(std::move(column), slot_id); +} + +void SegmentedChunk::append_chunk(const ChunkPtr& chunk, const std::vector& slots) { + ChunkPtr open_segment = _segments.back(); + size_t append_rows = chunk->num_rows(); + size_t append_index = 0; + while (append_rows > 0) { + size_t open_segment_append_rows = std::min(_segment_size - open_segment->num_rows(), append_rows); + for (int i = 0; i < slots.size(); i++) { + SlotId slot = slots[i]; + ColumnPtr column = chunk->get_column_by_slot_id(slot); + open_segment->columns()[i]->append(*column, append_index, open_segment_append_rows); + } + append_index += open_segment_append_rows; + append_rows -= open_segment_append_rows; + if (open_segment->num_rows() == _segment_size) { + open_segment->check_or_die(); + open_segment = open_segment->clone_empty(); + _segments.emplace_back(open_segment); + } + } +} + +void SegmentedChunk::append_chunk(const ChunkPtr& chunk) { + ChunkPtr open_segment = _segments.back(); + size_t append_rows = chunk->num_rows(); + size_t append_index = 0; + while (append_rows > 0) { + size_t open_segment_append_rows = std::min(_segment_size - open_segment->num_rows(), append_rows); + open_segment->append_safe(*chunk, append_index, open_segment_append_rows); + append_index += open_segment_append_rows; + append_rows -= open_segment_append_rows; + if (open_segment->num_rows() == _segment_size) { + open_segment->check_or_die(); + open_segment = open_segment->clone_empty(); + _segments.emplace_back(open_segment); + } + } +} + +void SegmentedChunk::append(const SegmentedChunkPtr& chunk, size_t offset) { + auto& input_segments = chunk->segments(); + size_t segment_index = offset / chunk->_segment_size; + size_t segment_offset = offset % chunk->_segment_size; + for (size_t i = segment_index; i < chunk->num_segments(); i++) { + // The segment need to cutoff + if (i == segment_index && segment_offset > 0) { + auto cutoff = input_segments[i]->clone_empty(); + size_t count = input_segments[i]->num_rows() - segment_offset; + cutoff->append(*input_segments[i], segment_offset, count); + append_chunk(std::move(cutoff)); + } else { + append_chunk(input_segments[i]); + } + } + for (auto& segment : _segments) { + segment->check_or_die(); + } +} + +void SegmentedChunk::build_columns() { + DCHECK(_segments.size() >= 1); + size_t num_columns = _segments[0]->num_columns(); + for (int i = 0; i < num_columns; i++) { + _columns.emplace_back(std::make_shared(shared_from_this(), i)); + } +} + +size_t SegmentedChunk::memory_usage() const { + size_t result = 0; + for (auto& chunk : _segments) { + result += chunk->memory_usage(); + } + return result; +} + +size_t SegmentedChunk::num_rows() const { + size_t result = 0; + for (auto& chunk : _segments) { + result += chunk->num_rows(); + } + return result; +} + +SegmentedColumnPtr SegmentedChunk::get_column_by_slot_id(SlotId slot_id) { + DCHECK(!!_segments[0]); + auto& map = _segments[0]->get_slot_id_to_index_map(); + auto iter = map.find(slot_id); + if (iter == map.end()) { + return nullptr; + } + return _columns[iter->second]; +} + +const SegmentedColumns& SegmentedChunk::columns() const { + return _columns; +} + +SegmentedColumns& SegmentedChunk::columns() { + return _columns; +} + +Status SegmentedChunk::upgrade_if_overflow() { + for (auto& chunk : _segments) { + RETURN_IF_ERROR(chunk->upgrade_if_overflow()); + } + return {}; +} + +Status SegmentedChunk::downgrade() { + for (auto& chunk : _segments) { + RETURN_IF_ERROR(chunk->downgrade()); + } + return {}; +} + +bool SegmentedChunk::has_large_column() const { + for (auto& chunk : _segments) { + if (chunk->has_large_column()) { + return true; + } + } + return false; +} + +size_t SegmentedChunk::num_segments() const { + return _segments.size(); +} + +const std::vector& SegmentedChunk::segments() const { + return _segments; +} +std::vector& SegmentedChunk::segments() { + return _segments; +} + +ChunkUniquePtr SegmentedChunk::clone_empty(size_t reserve) { + return _segments[0]->clone_empty(reserve); +} + +void SegmentedChunk::reset() { + for (auto& chunk : _segments) { + chunk->reset(); + } +} + +void SegmentedChunk::check_or_die() { + for (auto& chunk : _segments) { + chunk->check_or_die(); + } +} + } // namespace starrocks diff --git a/be/src/storage/chunk_helper.h b/be/src/storage/chunk_helper.h index e8a33fcd5d4d4..13aee01c3e3ac 100644 --- a/be/src/storage/chunk_helper.h +++ b/be/src/storage/chunk_helper.h @@ -15,8 +15,13 @@ #pragma once #include +#include #include +#include "column/column_visitor.h" +#include "column/column_visitor_adapter.h" +#include "column/datum.h" +#include "column/fixed_length_column_base.h" #include "column/vectorized_fwd.h" #include "storage/olap_common.h" #include "storage/olap_type_infra.h" @@ -149,4 +154,67 @@ class ChunkPipelineAccumulator { bool _finalized = false; }; +class SegmentedColumn final : public std::enable_shared_from_this { +public: + SegmentedColumn(SegmentedChunkPtr chunk, size_t column_index); + SegmentedColumn(std::vector columns, size_t segment_size); + ~SegmentedColumn() = default; + + ColumnPtr clone_selective(const uint32_t* indexes, uint32_t from, uint32_t size); + ColumnPtr materialize() const; + + bool is_nullable() const; + bool has_null() const; + size_t size() const; + void upgrade_to_nullable(); + size_t segment_size() const; + std::vector columns() const; + +private: + SegmentedChunkPtr _chunk; // The chunk it belongs to + size_t _column_index; // The index in original chunk + const size_t _segment_size; + + std::vector _cached_columns; // Only used for SelectiveCopy +}; + +// A big-chunk would be segmented into multi small ones, to avoid allocating large-continuous memory +// It's not a transparent replacement for Chunk, but must be aware of and set a reasonale chunk_size +class SegmentedChunk final : public std::enable_shared_from_this { +public: + SegmentedChunk(size_t segment_size); + ~SegmentedChunk() = default; + + static SegmentedChunkPtr create(size_t segment_size); + + void append_column(ColumnPtr column, SlotId slot_id); + void append_chunk(const ChunkPtr& chunk, const std::vector& slots); + void append_chunk(const ChunkPtr& chunk); + void append(const SegmentedChunkPtr& chunk, size_t offset); + void build_columns(); + + SegmentedColumnPtr get_column_by_slot_id(SlotId slot_id); + const SegmentedColumns& columns() const; + SegmentedColumns& columns(); + size_t num_segments() const; + const std::vector& segments() const; + std::vector& segments(); + ChunkUniquePtr clone_empty(size_t reserve); + + size_t segment_size() const; + void reset(); + size_t memory_usage() const; + size_t num_rows() const; + Status upgrade_if_overflow(); + Status downgrade(); + bool has_large_column() const; + void check_or_die(); + +private: + std::vector _segments; + SegmentedColumns _columns; + + const size_t _segment_size; +}; + } // namespace starrocks diff --git a/be/test/storage/chunk_helper_test.cpp b/be/test/storage/chunk_helper_test.cpp index ea86762d3ff22..a32c5d3217427 100644 --- a/be/test/storage/chunk_helper_test.cpp +++ b/be/test/storage/chunk_helper_test.cpp @@ -16,7 +16,9 @@ #include "column/chunk.h" #include "column/column.h" +#include "column/column_helper.h" #include "column/nullable_column.h" +#include "column/vectorized_fwd.h" #include "common/object_pool.h" #include "gtest/gtest.h" #include "runtime/descriptor_helper.h" @@ -36,6 +38,8 @@ class ChunkHelperTest : public ::testing::Test { TSlotDescriptor _create_slot_desc(LogicalType type, const std::string& col_name, int col_pos); TupleDescriptor* _create_tuple_desc(); + SegmentedChunkPtr build_segmented_chunk(); + // A tuple with one column TupleDescriptor* _create_simple_desc() { TDescriptorTableBuilder table_builder; @@ -55,6 +59,26 @@ class ChunkHelperTest : public ::testing::Test { return tuple_desc; } + // A tuple with two column + TupleDescriptor* _create_simple_desc2() { + TDescriptorTableBuilder table_builder; + TTupleDescriptorBuilder tuple_builder; + + tuple_builder.add_slot(_create_slot_desc(LogicalType::TYPE_INT, "c0", 0)); + tuple_builder.add_slot(_create_slot_desc(LogicalType::TYPE_VARCHAR, "c1", 1)); + tuple_builder.build(&table_builder); + + std::vector row_tuples{0}; + DescriptorTbl* tbl = nullptr; + CHECK(DescriptorTbl::create(&_runtime_state, &_pool, table_builder.desc_tbl(), &tbl, config::vector_chunk_size) + .ok()); + + auto* row_desc = _pool.add(new RowDescriptor(*tbl, row_tuples)); + auto* tuple_desc = row_desc->tuple_descriptors()[0]; + + return tuple_desc; + } + RuntimeState _runtime_state; ObjectPool _pool; }; @@ -90,6 +114,29 @@ TupleDescriptor* ChunkHelperTest::_create_tuple_desc() { return tuple_desc; } +SegmentedChunkPtr ChunkHelperTest::build_segmented_chunk() { + auto* tuple_desc = _create_simple_desc2(); + auto segmented_chunk = SegmentedChunk::create(1 << 16); + segmented_chunk->append_column(Int32Column::create(), 0); + segmented_chunk->append_column(BinaryColumn::create(), 1); + segmented_chunk->build_columns(); + + // put 100 chunks into the segmented chunk + int row_id = 0; + for (int i = 0; i < 100; i++) { + size_t chunk_rows = 4096; + auto chunk = ChunkHelper::new_chunk(*tuple_desc, chunk_rows); + for (int j = 0; j < chunk_rows; j++) { + chunk->get_column_by_index(0)->append_datum(row_id++); + std::string str = fmt::format("str{}", row_id); + chunk->get_column_by_index(1)->append_datum(Slice(str)); + } + + segmented_chunk->append_chunk(std::move(chunk)); + } + return segmented_chunk; +} + TEST_F(ChunkHelperTest, new_chunk_with_tuple) { auto* tuple_desc = _create_tuple_desc(); @@ -185,6 +232,65 @@ TEST_F(ChunkHelperTest, Accumulator) { EXPECT_TRUE(accumulator.reach_limit()); } +TEST_F(ChunkHelperTest, SegmentedChunk) { + auto segmented_chunk = build_segmented_chunk(); + + [[maybe_unused]] auto downgrade_result = segmented_chunk->downgrade(); + [[maybe_unused]] auto upgrade_result = segmented_chunk->upgrade_if_overflow(); + + EXPECT_EQ(409600, segmented_chunk->num_rows()); + EXPECT_EQ(7, segmented_chunk->num_segments()); + EXPECT_EQ(8043542, segmented_chunk->memory_usage()); + EXPECT_EQ(2, segmented_chunk->columns().size()); + auto column0 = segmented_chunk->columns()[0]; + EXPECT_EQ(false, column0->is_nullable()); + EXPECT_EQ(false, column0->has_null()); + EXPECT_EQ(409600, column0->size()); + std::vector indexes = {1, 2, 4, 10000, 20000}; + ColumnPtr cloned = column0->clone_selective(indexes.data(), 0, indexes.size()); + EXPECT_EQ("[1, 2, 4, 10000, 20000]", cloned->debug_string()); + + // reset + segmented_chunk->reset(); + EXPECT_EQ(0, segmented_chunk->num_rows()); + EXPECT_EQ(7, segmented_chunk->num_segments()); + EXPECT_EQ(8043542, segmented_chunk->memory_usage()); + + // slicing + segmented_chunk = build_segmented_chunk(); + SegmentedChunkSlice slice; + slice.reset(segmented_chunk); + size_t total_rows = 0; + while (!slice.empty()) { + auto chunk = slice.cutoff(1000); + EXPECT_LE(chunk->num_rows(), 1000); + auto& slices = ColumnHelper::as_column(chunk->get_column_by_index(1))->get_data(); + for (int i = 0; i < chunk->num_rows(); i++) { + EXPECT_EQ(total_rows + i, chunk->get_column_by_index(0)->get(i).get_int32()); + EXPECT_EQ(fmt::format("str{}", total_rows + i + 1), slices[i].to_string()); + } + total_rows += chunk->num_rows(); + } + EXPECT_EQ(409600, total_rows); + EXPECT_EQ(0, segmented_chunk->num_rows()); + EXPECT_EQ(7, segmented_chunk->num_segments()); + segmented_chunk->check_or_die(); + + // append + auto seg1 = build_segmented_chunk(); + auto seg2 = build_segmented_chunk(); + seg1->append(seg2, 1); + EXPECT_EQ(409600 * 2 - 1, seg1->num_rows()); + seg1->check_or_die(); + // clone_selective + { + std::vector index{1, 2, 4, 10000, 20000}; + auto column1 = seg1->columns()[1]; + ColumnPtr str_column1 = column1->clone_selective(index.data(), 0, index.size()); + EXPECT_EQ("['str2', 'str3', 'str5', 'str10001', 'str20001']", str_column1->debug_string()); + } +} + class ChunkPipelineAccumulatorTest : public ::testing::Test { protected: ChunkPtr _generate_chunk(size_t rows, size_t cols, size_t reserve_size = 0);