Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[refactor](schema change)Remove delete from sc #11441

Merged
merged 7 commits into from
Aug 2, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 1 addition & 26 deletions be/src/olap/collect_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ CollectIterator::Level0Iterator::Level0Iterator(RowsetReaderSharedPtr rs_reader,
if (LIKELY(rs_reader->type() == RowsetTypePB::BETA_ROWSET)) {
_refresh_current_row = &Level0Iterator::_refresh_current_row_v2;
} else {
_refresh_current_row = &Level0Iterator::_refresh_current_row_v1;
LOG(FATAL) << "Not supported rowset type";
}
}

Expand All @@ -220,31 +220,6 @@ int64_t CollectIterator::Level0Iterator::version() const {
return _rs_reader->version().second;
}

Status CollectIterator::Level0Iterator::_refresh_current_row_v1() {
do {
if (_row_block != nullptr && _row_block->has_remaining()) {
size_t pos = _row_block->pos();
_row_block->get_row(pos, &_row_cursor);
if (_row_block->block_status() == DEL_PARTIAL_SATISFIED &&
_reader->_delete_handler.is_filter_data(version(), _row_cursor)) {
_reader->_stats.rows_del_filtered++;
_row_block->pos_inc();
continue;
}
_current_row = &_row_cursor;
return Status::OK();
} else {
auto res = _rs_reader->next_block(&_row_block);
if (!res.ok()) {
_current_row = nullptr;
return res;
}
}
} while (_row_block != nullptr);
_current_row = nullptr;
return Status::OLAPInternalError(OLAP_ERR_DATA_EOF);
}

Status CollectIterator::Level0Iterator::_refresh_current_row_v2() {
do {
if (_row_block != nullptr && _row_block->has_remaining()) {
Expand Down
1 change: 0 additions & 1 deletion be/src/olap/collect_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ class CollectIterator {
private:
Status (Level0Iterator::*_refresh_current_row)() = nullptr;

Status _refresh_current_row_v1();
Status _refresh_current_row_v2();

RowsetReaderSharedPtr _rs_reader;
Expand Down
13 changes: 0 additions & 13 deletions be/src/olap/delete_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -310,19 +310,6 @@ Status DeleteHandler::init(const TabletSchema& schema,
return Status::OK();
}

bool DeleteHandler::is_filter_data(const int64_t data_version, const RowCursor& row) const {
// According to semantics, the delete condition stored in _del_conds should be an OR relationship,
// so as long as the data matches one of the _del_conds, it will return true.
for (const auto& del_cond : _del_conds) {
if (data_version <= del_cond.filter_version &&
del_cond.del_cond->delete_conditions_eval(row)) {
return true;
}
}

return false;
}

std::vector<int64_t> DeleteHandler::get_conds_version() {
std::vector<int64_t> conds_version;
for (const auto& cond : _del_conds) {
Expand Down
16 changes: 1 addition & 15 deletions be/src/olap/delete_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,7 @@ struct DeleteConditions {
// Status res;
// DeleteHandler delete_handler;
// res = delete_handler.init(tablet, condition_version);
// 2. Use it to check whether a row should be deleted:
// bool should_be_deleted = delete_handler.is_filter_data(data_version, row_cursor);
// 3. If there are multiple rows, you can invoke function is_filter_data multiple times:
// should_be_deleted = delete_handler.is_filter_data(data_version, row_cursor);
// 4. After all rows have been checked, you should release this object by calling:
// 2. After all rows have been checked, you should release this object by calling:
// delete_handler.finalize();
//
// NOTE:
Expand Down Expand Up @@ -96,16 +92,6 @@ class DeleteHandler {
Status init(const TabletSchema& schema, const std::vector<DeletePredicatePB>& delete_conditions,
int64_t version, const doris::TabletReader* = nullptr);

// Check whether a row should be deleted.
//
// input:
// * data_version: the version of this row
// * row: the row data to be checked
// return:
// * true: this row should be deleted
// * false: this row should NOT be deleted
bool is_filter_data(const int64_t data_version, const RowCursor& row) const;

// Return the delete conditions' size.
size_t conditions_num() const { return _del_conds.size(); }

Expand Down
68 changes: 0 additions & 68 deletions be/src/olap/olap_cond.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,46 +174,6 @@ Status Cond::init(const TCondition& tcond, const TabletColumn& column) {
return Status::OK();
}

bool Cond::eval(const RowCursorCell& cell) const {
if (cell.is_null() && op != OP_IS) {
//Any operation other than OP_IS operand and NULL is false
return false;
}

switch (op) {
case OP_EQ:
return operand_field->field()->compare_cell(*operand_field, cell) == 0;
case OP_NE:
return operand_field->field()->compare_cell(*operand_field, cell) != 0;
case OP_LT:
return operand_field->field()->compare_cell(*operand_field, cell) > 0;
case OP_LE:
return operand_field->field()->compare_cell(*operand_field, cell) >= 0;
case OP_GT:
return operand_field->field()->compare_cell(*operand_field, cell) < 0;
case OP_GE:
return operand_field->field()->compare_cell(*operand_field, cell) <= 0;
case OP_IN: {
WrapperField wrapperField(const_cast<Field*>(min_value_field->field()), cell);
auto ret = operand_set.find(&wrapperField) != operand_set.end();
wrapperField.release_field();
return ret;
}
case OP_NOT_IN: {
WrapperField wrapperField(const_cast<Field*>(min_value_field->field()), cell);
auto ret = operand_set.find(&wrapperField) == operand_set.end();
wrapperField.release_field();
return ret;
}
case OP_IS: {
return operand_field->is_null() == cell.is_null();
}
default:
// Unknown operation type, just return false
return false;
}
}

bool Cond::eval(const std::pair<WrapperField*, WrapperField*>& statistic) const {
//A single query condition filtered by a single column
// When we apply column statistic, Field can be NULL when type is Varchar,
Expand Down Expand Up @@ -510,18 +470,6 @@ Status CondColumn::add_cond(const TCondition& tcond, const TabletColumn& column)
return Status::OK();
}

bool CondColumn::eval(const RowCursor& row) const {
auto cell = row.cell(_col_index);
for (auto& each_cond : _conds) {
// As long as there is one condition not satisfied, we can return false
if (!each_cond->eval(cell)) {
return false;
}
}

return true;
}

bool CondColumn::eval(const std::pair<WrapperField*, WrapperField*>& statistic) const {
for (auto& each_cond : _conds) {
// As long as there is one condition not satisfied, we can return false
Expand Down Expand Up @@ -613,22 +561,6 @@ Status Conditions::append_condition(const TCondition& tcond) {
return cond_col->add_cond(tcond, column);
}

bool Conditions::delete_conditions_eval(const RowCursor& row) const {
if (_columns.empty()) {
return false;
}

for (auto& each_cond : _columns) {
if (_cond_column_is_key_or_duplicate(each_cond.second) && !each_cond.second->eval(row)) {
return false;
}
}

VLOG_NOTICE << "Row meets the delete conditions. "
<< "condition_count=" << _columns.size() << ", row=" << row.to_string();
return true;
}

CondColumn* Conditions::get_column(int32_t cid) const {
auto iter = _columns.find(cid);
if (iter != _columns.end()) {
Expand Down
9 changes: 0 additions & 9 deletions be/src/olap/olap_cond.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ struct Cond {

// 用一行数据的指定列同条件进行比较,如果符合过滤条件,
// 即按照此条件,行应被过滤掉,则返回true,否则返回false
bool eval(const RowCursorCell& cell) const;
bool eval(const KeyRange& statistic) const;

// 通过单列上的单个删除条件对version进行过滤
Expand Down Expand Up @@ -104,10 +103,6 @@ class CondColumn {

Status add_cond(const TCondition& tcond, const TabletColumn& column);

// 对一行数据中的指定列,用所有过滤条件进行比较,如果所有条件都满足,则过滤此行
// Return true means this row should be filtered out, otherwise return false
bool eval(const RowCursor& row) const;

// Return true if the rowset should be pruned
bool eval(const std::pair<WrapperField*, WrapperField*>& statistic) const;

Expand Down Expand Up @@ -171,10 +166,6 @@ class Conditions {
// 2. column类型是double, float
Status append_condition(const TCondition& condition);

// 通过所有列上的删除条件对RowCursor进行过滤
// Return true means this row should be filtered out, otherwise return false
bool delete_conditions_eval(const RowCursor& row) const;

const CondColumns& columns() const { return _columns; }

CondColumn* get_column(int32_t cid) const;
Expand Down
85 changes: 23 additions & 62 deletions be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,33 +268,29 @@ ColumnMapping* RowBlockChanger::get_mutable_column_mapping(size_t column_index)
return &(_schema_mapping[column_index]);
}

#define TYPE_REINTERPRET_CAST(FromType, ToType) \
{ \
size_t row_num = ref_block->row_block_info().row_num; \
for (size_t row = 0, mutable_row = 0; row < row_num; ++row) { \
if (is_data_left_vec[row] != 0) { \
char* ref_ptr = ref_block->field_ptr(row, ref_column); \
char* new_ptr = mutable_block->field_ptr(mutable_row++, i); \
*new_ptr = *ref_ptr; \
*(ToType*)(new_ptr + 1) = *(FromType*)(ref_ptr + 1); \
} \
} \
break; \
}

#define LARGEINT_REINTERPRET_CAST(FromType, ToType) \
{ \
size_t row_num = ref_block->row_block_info().row_num; \
for (size_t row = 0, mutable_row = 0; row < row_num; ++row) { \
if (is_data_left_vec[row] != 0) { \
char* ref_ptr = ref_block->field_ptr(row, ref_column); \
char* new_ptr = mutable_block->field_ptr(mutable_row++, i); \
*new_ptr = *ref_ptr; \
ToType new_value = *(FromType*)(ref_ptr + 1); \
memcpy(new_ptr + 1, &new_value, sizeof(ToType)); \
} \
} \
break; \
#define TYPE_REINTERPRET_CAST(FromType, ToType) \
{ \
size_t row_num = ref_block->row_block_info().row_num; \
for (size_t row = 0, mutable_row = 0; row < row_num; ++row) { \
char* ref_ptr = ref_block->field_ptr(row, ref_column); \
char* new_ptr = mutable_block->field_ptr(mutable_row++, i); \
*new_ptr = *ref_ptr; \
*(ToType*)(new_ptr + 1) = *(FromType*)(ref_ptr + 1); \
} \
break; \
}

#define LARGEINT_REINTERPRET_CAST(FromType, ToType) \
{ \
size_t row_num = ref_block->row_block_info().row_num; \
for (size_t row = 0, mutable_row = 0; row < row_num; ++row) { \
char* ref_ptr = ref_block->field_ptr(row, ref_column); \
char* new_ptr = mutable_block->field_ptr(mutable_row++, i); \
*new_ptr = *ref_ptr; \
ToType new_value = *(FromType*)(ref_ptr + 1); \
memcpy(new_ptr + 1, &new_value, sizeof(ToType)); \
} \
break; \
}

#define CONVERT_FROM_TYPE(from_type) \
Expand Down Expand Up @@ -615,27 +611,10 @@ Status RowBlockChanger::change_row_block(const RowBlock* ref_block, int32_t data
// a.1 First determine whether the data needs to be filtered, and finally only those marked as 1 are left as needed
// For those without filter, it is equivalent to leave after setting all to 1
const uint32_t row_num = ref_block->row_block_info().row_num;
// (0 means no need to filter out, 1 means yes, during the process 2 means that this row needs to be cut and there is no need to compare other columns later)
std::vector<int8_t> is_data_left_vec(row_num, 1);

// Compare each row
for (size_t row_index = 0; row_index < row_num; ++row_index) {
ref_block->get_row(row_index, &read_helper);

// filter data according to delete conditions specified in DeleteData command
if (is_data_left_vec[row_index] == 1) {
if (_delete_handler != nullptr &&
_delete_handler->is_filter_data(data_version, read_helper)) {
is_data_left_vec[row_index] = 0;
(*filtered_rows)++;
}
}
}

// a.2 Calculate the left row num
uint32_t new_row_num = row_num - *filtered_rows;

const bool need_filter_data = (new_row_num != row_num);
const bool filter_all = (new_row_num == 0);

MemPool* mem_pool = mutable_block->mem_pool();
Expand All @@ -662,10 +641,6 @@ Status RowBlockChanger::change_row_block(const RowBlock* ref_block, int32_t data
<< _schema_mapping[i].materialized_function;
for (size_t row_index = 0, new_row_index = 0;
row_index < ref_block->row_block_info().row_num; ++row_index) {
// No need row, need to be filter
if (need_filter_data && is_data_left_vec[row_index] == 0) {
continue;
}
mutable_block->get_row(new_row_index++, &write_helper);
ref_block->get_row(row_index, &read_helper);

Expand All @@ -686,11 +661,6 @@ Status RowBlockChanger::change_row_block(const RowBlock* ref_block, int32_t data
// Low efficiency, you can also directly calculate the variable length domain copy, but it will still destroy the package
for (size_t row_index = 0, new_row_index = 0;
row_index < ref_block->row_block_info().row_num; ++row_index) {
// Unneeded row, skip every time this row is processed
if (need_filter_data && is_data_left_vec[row_index] == 0) {
continue;
}

// Specify the new row index to be written (different from the read row_index)
mutable_block->get_row(new_row_index++, &write_helper);
ref_block->get_row(row_index, &read_helper);
Expand Down Expand Up @@ -720,10 +690,6 @@ Status RowBlockChanger::change_row_block(const RowBlock* ref_block, int32_t data
} else if (ConvertTypeResolver::instance()->get_convert_type_info(reftype, newtype)) {
for (size_t row_index = 0, new_row_index = 0;
row_index < ref_block->row_block_info().row_num; ++row_index) {
// Skip filtered rows
if (need_filter_data && is_data_left_vec[row_index] == 0) {
continue;
}
mutable_block->get_row(new_row_index++, &write_helper);
ref_block->get_row(row_index, &read_helper);
if (read_helper.is_null(ref_column)) {
Expand Down Expand Up @@ -786,11 +752,6 @@ Status RowBlockChanger::change_row_block(const RowBlock* ref_block, int32_t data
// New column, write default value
for (size_t row_index = 0, new_row_index = 0;
row_index < ref_block->row_block_info().row_num; ++row_index) {
// Unneeded row, skip every time this row is processed
if (need_filter_data && is_data_left_vec[row_index] == 0) {
continue;
}

mutable_block->get_row(new_row_index++, &write_helper);

if (_schema_mapping[i].default_value->is_null()) {
Expand Down
9 changes: 0 additions & 9 deletions be/test/olap/delete_handler_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -961,15 +961,12 @@ TEST_F(TestDeleteHandler, FilterDataSubconditions) {
OlapTuple tuple1(data_str);
res = _data_row_cursor.from_tuple(tuple1);
EXPECT_EQ(Status::OK(), res);
EXPECT_TRUE(_delete_handler.is_filter_data(1, _data_row_cursor));

// 构造一行测试数据
data_str[1] = "4";
OlapTuple tuple2(data_str);
res = _data_row_cursor.from_tuple(tuple2);
EXPECT_EQ(Status::OK(), res);
// 不满足子条件:k2!=4
EXPECT_FALSE(_delete_handler.is_filter_data(1, _data_row_cursor));

_delete_handler.finalize();
}
Expand Down Expand Up @@ -1048,8 +1045,6 @@ TEST_F(TestDeleteHandler, FilterDataConditions) {
OlapTuple tuple(data_str);
res = _data_row_cursor.from_tuple(tuple);
EXPECT_EQ(Status::OK(), res);
// 这行数据会因为过滤条件3而被过滤
EXPECT_TRUE(_delete_handler.is_filter_data(3, _data_row_cursor));

_delete_handler.finalize();
}
Expand Down Expand Up @@ -1114,10 +1109,6 @@ TEST_F(TestDeleteHandler, FilterDataVersion) {
OlapTuple tuple(data_str);
res = _data_row_cursor.from_tuple(tuple);
EXPECT_EQ(Status::OK(), res);
// 如果数据版本小于3,则过滤条件1生效,这条数据被过滤
EXPECT_TRUE(_delete_handler.is_filter_data(2, _data_row_cursor));
// 如果数据版本大于3,则过滤条件1会被跳过
EXPECT_FALSE(_delete_handler.is_filter_data(4, _data_row_cursor));

_delete_handler.finalize();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
1 1
3 3
4 4

-- !sql --
1 1
3 3
4 4
5 abc

Loading