Skip to content

Commit

Permalink
f
Browse files Browse the repository at this point in the history
  • Loading branch information
yiguolei committed Jan 15, 2025
1 parent 74de664 commit 765ac28
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 18 deletions.
35 changes: 18 additions & 17 deletions be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ MemTable::MemTable(int64_t tablet_id, std::shared_ptr<TabletSchema> tablet_schem
}
// TODO: Support ZOrderComparator in the future
_init_columns_offset_by_slot_descs(slot_descs, tuple_desc);
_row_in_blocks = std::make_unique<DorisVector<RowInBlock*>>();
}

void MemTable::_init_columns_offset_by_slot_descs(const std::vector<SlotDescriptor*>* slot_descs,
Expand Down Expand Up @@ -152,7 +153,7 @@ MemTable::~MemTable() {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
g_memtable_cnt << -1;
if (_keys_type != KeysType::DUP_KEYS) {
for (auto it = _row_in_blocks.begin(); it != _row_in_blocks.end(); it++) {
for (auto it = _row_in_blocks->begin(); it != _row_in_blocks->end(); it++) {
if (!(*it)->has_init_agg()) {
continue;
}
Expand All @@ -165,13 +166,13 @@ MemTable::~MemTable() {
}
}
}
std::for_each(_row_in_blocks.begin(), _row_in_blocks.end(),
std::for_each(_row_in_blocks->begin(), _row_in_blocks->end(),
std::default_delete<RowInBlock>());
// Arena has to be destroyed after agg state, because some agg state's memory may be
// allocated in arena.
_arena.reset();
_vec_row_comparator.reset();
_row_in_blocks.clear();
_row_in_blocks->reset();
_agg_functions.clear();
_input_mutable_block.clear();
_output_mutable_block.clear();
Expand Down Expand Up @@ -236,7 +237,7 @@ Status MemTable::insert(const vectorized::Block* input_block,
RETURN_IF_ERROR(_input_mutable_block.add_rows(input_block, row_idxs.data(),
row_idxs.data() + num_rows, &_column_offset));
for (int i = 0; i < num_rows; i++) {
_row_in_blocks.emplace_back(new RowInBlock {cursor_in_mutableblock + i});
_row_in_blocks->emplace_back(new RowInBlock {cursor_in_mutableblock + i});
}

_stat.raw_rows += num_rows;
Expand Down Expand Up @@ -294,8 +295,8 @@ Status MemTable::_put_into_output(vectorized::Block& in_block) {
DorisVector<uint32_t> row_pos_vec;
DCHECK(in_block.rows() <= std::numeric_limits<int>::max());
row_pos_vec.reserve(in_block.rows());
for (int i = 0; i < _row_in_blocks.size(); i++) {
row_pos_vec.emplace_back(_row_in_blocks[i]->_row_pos);
for (int i = 0; i < _row_in_blocks->size(); i++) {
row_pos_vec.emplace_back((*_row_in_blocks)[i]->_row_pos);
}
return _output_mutable_block.add_rows(&in_block, row_pos_vec.data(),
row_pos_vec.data() + in_block.rows());
Expand All @@ -306,19 +307,19 @@ size_t MemTable::_sort() {
_stat.sort_times++;
size_t same_keys_num = 0;
// sort new rows
Tie tie = Tie(_last_sorted_pos, _row_in_blocks.size());
Tie tie = Tie(_last_sorted_pos, _row_in_blocks->size());
for (size_t i = 0; i < _tablet_schema->num_key_columns(); i++) {
auto cmp = [&](const RowInBlock* lhs, const RowInBlock* rhs) -> int {
return _input_mutable_block.compare_one_column(lhs->_row_pos, rhs->_row_pos, i, -1);
};
_sort_one_column(_row_in_blocks, tie, cmp);
_sort_one_column(*_row_in_blocks, tie, cmp);
}
bool is_dup = (_keys_type == KeysType::DUP_KEYS);
// sort extra round by _row_pos to make the sort stable
auto iter = tie.iter();
while (iter.next()) {
pdqsort(std::next(_row_in_blocks.begin(), iter.left()),
std::next(_row_in_blocks.begin(), iter.right()),
pdqsort(std::next(_row_in_blocks->begin(), iter.left()),
std::next(_row_in_blocks->begin(), iter.right()),
[&is_dup](const RowInBlock* lhs, const RowInBlock* rhs) -> bool {
return is_dup ? lhs->_row_pos > rhs->_row_pos : lhs->_row_pos < rhs->_row_pos;
});
Expand All @@ -336,9 +337,9 @@ size_t MemTable::_sort() {
return value < 0;
}
};
auto new_row_it = std::next(_row_in_blocks.begin(), _last_sorted_pos);
std::inplace_merge(_row_in_blocks.begin(), new_row_it, _row_in_blocks.end(), cmp_func);
_last_sorted_pos = _row_in_blocks.size();
auto new_row_it = std::next(_row_in_blocks->begin(), _last_sorted_pos);
std::inplace_merge(_row_in_blocks->begin(), new_row_it, _row_in_blocks->end(), cmp_func);
_last_sorted_pos = _row_in_blocks->size();
return same_keys_num;
}

Expand Down Expand Up @@ -492,7 +493,7 @@ void MemTable::_aggregate() {
};

if (!has_skip_bitmap_col || _seq_col_idx_in_block == -1) {
for (RowInBlock* cur_row : _row_in_blocks) {
for (RowInBlock* cur_row : *_row_in_blocks) {
if (!temp_row_in_blocks.empty() && (*_vec_row_comparator)(prev_row, cur_row) == 0) {
if (!prev_row->has_init_agg()) {
init_for_agg(prev_row);
Expand Down Expand Up @@ -551,7 +552,7 @@ void MemTable::_aggregate() {
auto& skip_bitmaps = assert_cast<vectorized::ColumnBitmap*>(
mutable_block.mutable_columns()[_skip_bitmap_col_idx].get())
->get_data();
for (auto* cur_row : _row_in_blocks) {
for (auto* cur_row : *_row_in_blocks) {
const BitmapValue& skip_bitmap = skip_bitmaps[cur_row->_row_pos];
bool with_seq_col = !skip_bitmap.contains(_seq_col_unique_id);
// compare keys, the keys of row_with_seq_col and row_with_seq_col is the same,
Expand Down Expand Up @@ -585,8 +586,8 @@ void MemTable::_aggregate() {
_output_mutable_block =
vectorized::MutableBlock::build_mutable_block(empty_input_block.get());
_output_mutable_block.clear_column_data();
_row_in_blocks = temp_row_in_blocks;
_last_sorted_pos = _row_in_blocks.size();
*_row_in_blocks = temp_row_in_blocks;
_last_sorted_pos = _row_in_blocks->size();
}
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ class MemTable {
std::vector<vectorized::AggregateFunctionPtr> _agg_functions;
std::vector<size_t> _offsets_of_aggregate_states;
size_t _total_size_of_aggregate_states;
DorisVector<RowInBlock*> _row_in_blocks;
std::unique_ptr<DorisVector<RowInBlock*>> _row_in_blocks;

size_t _num_columns;
int32_t _seq_col_idx_in_block = -1;
Expand Down

0 comments on commit 765ac28

Please sign in to comment.