Skip to content

Commit

Permalink
[fix](partial update) Fix missing rowsets during doing alignment when…
Browse files Browse the repository at this point in the history
… flushing memtable due to compaction (apache#28062)
  • Loading branch information
bobhan1 authored Dec 10, 2023
1 parent a3cd36c commit 485d7db
Show file tree
Hide file tree
Showing 7 changed files with 182 additions and 4 deletions.
9 changes: 7 additions & 2 deletions be/src/olap/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -493,13 +493,18 @@ inline RowsetId extract_rowset_id(std::string_view filename) {
class DeleteBitmap;
// merge on write context
struct MowContext {
MowContext(int64_t version, int64_t txnid, const RowsetIdUnorderedSet& ids,
MowContext(int64_t version, int64_t txnid, RowsetIdUnorderedSet& ids,
std::shared_ptr<DeleteBitmap> db)
: max_version(version), txn_id(txnid), rowset_ids(ids), delete_bitmap(db) {}
void update_rowset_ids_with_lock(std::function<void()> callback) {
std::lock_guard<std::mutex> lock(m);
callback();
}
int64_t max_version;
int64_t txn_id;
const RowsetIdUnorderedSet& rowset_ids;
RowsetIdUnorderedSet& rowset_ids;
std::shared_ptr<DeleteBitmap> delete_bitmap;
std::mutex m; // protection for updating rowset_ids only
};

// used in mow partial update
Expand Down
27 changes: 27 additions & 0 deletions be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,33 @@ Status BetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) {
{
std::shared_lock meta_rlock(tablet->get_header_lock());
specified_rowsets = tablet->get_rowset_by_ids(&_context.mow_context->rowset_ids);
DBUG_EXECUTE_IF("BetaRowsetWriter::_generate_delete_bitmap.clear_specified_rowsets",
{ specified_rowsets.clear(); });
if (specified_rowsets.size() != _context.mow_context->rowset_ids.size()) {
// `get_rowset_by_ids` may fail to find some of the rowsets we request if cumulative compaction delete
// rowsets from `_rs_version_map`(see `Tablet::modify_rowsets` for detials) before we get here.
// Becasue we havn't begun calculation for merge-on-write table, we can safely reset the `_context.mow_context->rowset_ids`
// to the latest value and re-request the correspoding rowsets.
LOG(INFO) << fmt::format(
"[Memtable Flush] some rowsets have been deleted due to "
"compaction(specified_rowsets.size()={}, but rowset_ids.size()={}), reset "
"rowset_ids to the latest value. tablet_id: {}, cur max_version: {}, "
"transaction_id: {}",
specified_rowsets.size(), _context.mow_context->rowset_ids.size(),
_context.tablet->tablet_id(), _context.mow_context->max_version,
_context.mow_context->txn_id);
Status st {Status::OK()};
_context.mow_context->update_rowset_ids_with_lock([&]() {
_context.mow_context->rowset_ids.clear();
st = tablet->all_rs_id(_context.mow_context->max_version,
&_context.mow_context->rowset_ids);
});
if (!st.ok()) {
return st;
}
specified_rowsets = tablet->get_rowset_by_ids(&_context.mow_context->rowset_ids);
DCHECK(specified_rowsets.size() == _context.mow_context->rowset_ids.size());
}
}
OlapStopWatch watch;
RETURN_IF_ERROR(tablet->calc_delete_bitmap(rowset, segments, specified_rowsets,
Expand Down
26 changes: 26 additions & 0 deletions be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
#include "service/point_query_executor.h"
#include "util/coding.h"
#include "util/crc32c.h"
#include "util/debug_points.h"
#include "util/faststring.h"
#include "util/key_util.h"
#include "vec/columns/column_nullable.h"
Expand Down Expand Up @@ -411,6 +412,31 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
{
std::shared_lock rlock(tablet->get_header_lock());
specified_rowsets = tablet->get_rowset_by_ids(&_mow_context->rowset_ids);
DBUG_EXECUTE_IF("_append_block_with_partial_content.clear_specified_rowsets",
{ specified_rowsets.clear(); });
if (specified_rowsets.size() != _mow_context->rowset_ids.size()) {
// `get_rowset_by_ids` may fail to find some of the rowsets we request if cumulative compaction delete
// rowsets from `_rs_version_map`(see `Tablet::modify_rowsets` for detials) before we get here.
// Becasue we havn't begun calculation for merge-on-write table, we can safely reset the `_mow_context->rowset_ids`
// to the latest value and re-request the correspoding rowsets.
LOG(INFO) << fmt::format(
"[Memtable Flush] some rowsets have been deleted due to "
"compaction(specified_rowsets.size()={}, but rowset_ids.size()={}), reset "
"rowset_ids to the latest value. tablet_id: {}, cur max_version: {}, "
"transaction_id: {}",
specified_rowsets.size(), _mow_context->rowset_ids.size(), tablet->tablet_id(),
_mow_context->max_version, _mow_context->txn_id);
Status st {Status::OK()};
_mow_context->update_rowset_ids_with_lock([&]() {
_mow_context->rowset_ids.clear();
st = tablet->all_rs_id(_mow_context->max_version, &_mow_context->rowset_ids);
});
if (!st.ok()) {
return st;
}
specified_rowsets = tablet->get_rowset_by_ids(&_mow_context->rowset_ids);
DCHECK(specified_rowsets.size() == _mow_context->rowset_ids.size());
}
}
std::vector<std::unique_ptr<SegmentCacheHandle>> segment_caches(specified_rowsets.size());
// locate rows in base data
Expand Down
26 changes: 26 additions & 0 deletions be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
#include "service/point_query_executor.h"
#include "util/coding.h"
#include "util/crc32c.h"
#include "util/debug_points.h"
#include "util/faststring.h"
#include "util/key_util.h"
#include "vec/columns/column_nullable.h"
Expand Down Expand Up @@ -344,6 +345,31 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
{
std::shared_lock rlock(tablet->get_header_lock());
specified_rowsets = tablet->get_rowset_by_ids(&_mow_context->rowset_ids);
DBUG_EXECUTE_IF("_append_block_with_partial_content.clear_specified_rowsets",
{ specified_rowsets.clear(); });
if (specified_rowsets.size() != _mow_context->rowset_ids.size()) {
// `get_rowset_by_ids` may fail to find some of the rowsets we request if cumulative compaction delete
// rowsets from `_rs_version_map`(see `Tablet::modify_rowsets` for detials) before we get here.
// Becasue we havn't begun calculation for merge-on-write table, we can safely reset the `_mow_context->rowset_ids`
// to the latest value and re-request the correspoding rowsets.
LOG(INFO) << fmt::format(
"[Memtable Flush] some rowsets have been deleted due to "
"compaction(specified_rowsets.size()={}, but rowset_ids.size()={}), reset "
"rowset_ids to the latest value. tablet_id: {}, cur max_version: {}, "
"transaction_id: {}",
specified_rowsets.size(), _mow_context->rowset_ids.size(), tablet->tablet_id(),
_mow_context->max_version, _mow_context->txn_id);
Status st {Status::OK()};
_mow_context->update_rowset_ids_with_lock([&]() {
_mow_context->rowset_ids.clear();
st = tablet->all_rs_id(_mow_context->max_version, &_mow_context->rowset_ids);
});
if (!st.ok()) {
return st;
}
specified_rowsets = tablet->get_rowset_by_ids(&_mow_context->rowset_ids);
DCHECK(specified_rowsets.size() == _mow_context->rowset_ids.size());
}
}
std::vector<std::unique_ptr<SegmentCacheHandle>> segment_caches(specified_rowsets.size());
// locate rows in base data
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !1 --
1 doris 1000 123 1
2 doris2 2000 223 1

-- !2 --
1 doris 200 123 1
2 doris2 400 223 1
4 yixiu 400 \N 4321

-- !3 --
1 doris 1000 123 1
2 doris2 2000 223 1

-- !4 --
1 doris333 6666 555 4
2 doris666 9999 888 7
3 doris222 1111 987 567

Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.codehaus.groovy.runtime.IOGroovyMethods
import org.apache.doris.regression.util.Http

suite("test_unique_key_mow_rowsets_deleted", "nonConcurrent"){

def tableName = "test_unique_key_mow_rowsets_deleted1"

// 1. requested rowsets have been deleted during partial update
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """ CREATE TABLE ${tableName} (
`id` int(11) NOT NULL COMMENT "用户 ID",
`name` varchar(65533) NOT NULL DEFAULT "yixiu" COMMENT "用户姓名",
`score` int(11) NOT NULL COMMENT "用户得分",
`test` int(11) NULL COMMENT "null test",
`dft` int(11) DEFAULT "4321")
UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES("replication_num" = "1", "enable_unique_key_merge_on_write" = "true"); """

sql """insert into ${tableName} values(2, "doris2", 2000, 223, 1),(1, "doris", 1000, 123, 1)"""
qt_1 """ select * from ${tableName} order by id; """
try {
GetDebugPoint().enableDebugPointForAllBEs("_append_block_with_partial_content.clear_specified_rowsets")
sql "set enable_unique_key_partial_update=true;"
sql "set enable_insert_strict = false;"
sql "sync;"
sql """insert into ${tableName}(id,score) values(2,400),(1,200),(4,400)"""
qt_2 """ select * from ${tableName} order by id; """
sql "set enable_unique_key_partial_update=false;"
sql "set enable_insert_strict = true;"
sql "sync;"
} finally {
GetDebugPoint().disableDebugPointForAllBEs("_append_block_with_partial_content.clear_specified_rowsets")
}
sql "DROP TABLE IF EXISTS ${tableName};"


// 2. requested rowsets have been deleted during row update
tableName = "test_unique_key_mow_rowsets_deleted2"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """ CREATE TABLE ${tableName} (
`id` int(11) NOT NULL COMMENT "用户 ID",
`name` varchar(65533) NOT NULL DEFAULT "yixiu" COMMENT "用户姓名",
`score` int(11) NOT NULL COMMENT "用户得分",
`test` int(11) NULL COMMENT "null test",
`dft` int(11) DEFAULT "4321")
UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES("replication_num" = "1", "enable_unique_key_merge_on_write" = "true"); """
sql """insert into ${tableName} values(2, "doris2", 2000, 223, 1),(1, "doris", 1000, 123, 1)"""
qt_3 """ select * from ${tableName} order by id; """
try {
GetDebugPoint().enableDebugPointForAllBEs("BetaRowsetWriter::_generate_delete_bitmap.clear_specified_rowsets")
sql """insert into ${tableName} values(2, "doris666", 9999, 888, 7),(1, "doris333", 6666, 555, 4), (3, "doris222", 1111, 987, 567);"""
qt_4 """ select * from ${tableName} order by id; """
} finally {
GetDebugPoint().disableDebugPointForAllBEs("BetaRowsetWriter::_generate_delete_bitmap.clear_specified_rowsets")
}
sql "DROP TABLE IF EXISTS ${tableName};"
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ suite("test_primary_key_partial_update_publish", "p0") {

file '10000.csv'
time 10000 // limit inflight 10s
}
}
streamLoad {
table "${tableName}"

Expand Down Expand Up @@ -68,5 +68,5 @@ suite("test_primary_key_partial_update_publish", "p0") {
"""

// drop drop
// sql """ DROP TABLE IF EXISTS ${tableName} """
sql """ DROP TABLE IF EXISTS ${tableName} """
}

0 comments on commit 485d7db

Please sign in to comment.