diff --git a/cloud/src/meta-service/meta_service.cpp b/cloud/src/meta-service/meta_service.cpp index 33c109d19bcc31..26c8c30110a1db 100644 --- a/cloud/src/meta-service/meta_service.cpp +++ b/cloud/src/meta-service/meta_service.cpp @@ -1685,6 +1685,7 @@ static bool check_delete_bitmap_lock(MetaServiceCode& code, std::string& msg, st std::string lock_val; DeleteBitmapUpdateLockPB lock_info; auto err = txn->get(lock_key, &lock_val); + TEST_SYNC_POINT_CALLBACK("check_delete_bitmap_lock.inject_get_lock_key_err", &err); if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { msg = "lock id key not found"; code = MetaServiceCode::LOCK_EXPIRED; @@ -1701,6 +1702,7 @@ static bool check_delete_bitmap_lock(MetaServiceCode& code, std::string& msg, st msg = "failed to parse DeleteBitmapUpdateLockPB"; return false; } + TEST_SYNC_POINT_CALLBACK("check_delete_bitmap_lock.set_lock_info", &lock_info); if (lock_info.lock_id() != lock_id) { ss << "lock id not match, locked by lock_id=" << lock_info.lock_id(); msg = ss.str(); @@ -1877,7 +1879,7 @@ void MetaServiceImpl::update_delete_bitmap(google::protobuf::RpcController* cont request->lock_id(), request->initiator())) { LOG(WARNING) << "failed to check delete bitmap lock, table_id=" << table_id << " request lock_id=" << request->lock_id() - << " request initiator=" << request->initiator() << " msg" << msg; + << " request initiator=" << request->initiator() << " msg " << msg; return; } } @@ -2150,32 +2152,6 @@ void MetaServiceImpl::get_delete_bitmap_update_lock(google::protobuf::RpcControl } } - bool require_tablet_stats = - request->has_require_compaction_stats() ? request->require_compaction_stats() : false; - if (require_tablet_stats) { - // this request is from fe when it commits txn for MOW table, we send the compaction stats - // along with the GetDeleteBitmapUpdateLockResponse which will be sent to BE later to let - // BE eliminate unnecessary sync_rowsets() calls if possible - for (const auto& tablet_index : request->tablet_indexes()) { - TabletIndexPB idx(tablet_index); - TabletStatsPB tablet_stat; - internal_get_tablet_stats(code, msg, txn.get(), instance_id, idx, tablet_stat, false); - if (code != MetaServiceCode::OK) { - response->clear_base_compaction_cnts(); - response->clear_cumulative_compaction_cnts(); - response->clear_cumulative_points(); - LOG_WARNING( - "failed to get tablet stats when get_delete_bitmap_update_lock, " - "lock_id={}, initiator={}, tablet_id={}", - request->lock_id(), request->initiator(), tablet_index.tablet_id()); - return; - } - response->add_base_compaction_cnts(tablet_stat.base_compaction_cnt()); - response->add_cumulative_compaction_cnts(tablet_stat.cumulative_compaction_cnt()); - response->add_cumulative_points(tablet_stat.cumulative_point()); - } - } - lock_info.set_lock_id(request->lock_id()); lock_info.set_expiration(now + request->expiration()); bool found = false; @@ -2205,6 +2181,74 @@ void MetaServiceImpl::get_delete_bitmap_update_lock(google::protobuf::RpcControl msg = ss.str(); return; } + + bool require_tablet_stats = + request->has_require_compaction_stats() ? request->require_compaction_stats() : false; + if (!require_tablet_stats) return; + // this request is from fe when it commits txn for MOW table, we send the compaction stats + // along with the GetDeleteBitmapUpdateLockResponse which will be sent to BE later to let + // BE eliminate unnecessary sync_rowsets() calls if possible + + // 1. hold the delete bitmap update lock in MS(update lock_info.lock_id to current load's txn id) + // 2. read tablets' stats + // 3. check whether we still hold the delete bitmap update lock + // these steps can be done in different fdb txns + + StopWatch read_stats_sw; + err = txn_kv_->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as(err); + msg = "failed to init txn"; + return; + } + + for (const auto& tablet_idx : request->tablet_indexes()) { + TabletStatsPB tablet_stat; + std::string stats_key = + stats_tablet_key({instance_id, tablet_idx.table_id(), tablet_idx.index_id(), + tablet_idx.partition_id(), tablet_idx.tablet_id()}); + std::string stats_val; + TxnErrorCode err = txn->get(stats_key, &stats_val); + TEST_SYNC_POINT_CALLBACK("get_delete_bitmap_update_lock.get_compaction_cnts_inject_error", + &err); + if (err == TxnErrorCode::TXN_TOO_OLD) { + code = MetaServiceCode::OK; + err = txn_kv_->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as(err); + ss << "failed to init txn when get tablet stats"; + msg = ss.str(); + return; + } + err = txn->get(stats_key, &stats_val); + } + if (err != TxnErrorCode::TXN_OK) { + code = cast_as(err); + msg = fmt::format("failed to get tablet stats, err={} tablet_id={}", err, + tablet_idx.tablet_id()); + return; + } + if (!tablet_stat.ParseFromArray(stats_val.data(), stats_val.size())) { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + msg = fmt::format("marformed tablet stats value, key={}", hex(stats_key)); + return; + } + response->add_base_compaction_cnts(tablet_stat.base_compaction_cnt()); + response->add_cumulative_compaction_cnts(tablet_stat.cumulative_compaction_cnt()); + response->add_cumulative_points(tablet_stat.cumulative_point()); + } + + read_stats_sw.pause(); + LOG(INFO) << fmt::format("tablet_idxes.size()={}, read tablet compaction cnts cost={} ms", + request->tablet_indexes().size(), read_stats_sw.elapsed_us() / 1000); + + if (!check_delete_bitmap_lock(code, msg, ss, txn, instance_id, table_id, request->lock_id(), + request->initiator())) { + LOG(WARNING) << "failed to check delete bitmap lock after get tablet stats, table_id=" + << table_id << " request lock_id=" << request->lock_id() + << " request initiator=" << request->initiator() << " code=" << code + << " msg=" << msg; + } } void MetaServiceImpl::remove_delete_bitmap_update_lock( diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp index d2dd80f6871e3e..ce5eda9ed7a46c 100644 --- a/cloud/test/meta_service_test.cpp +++ b/cloud/test/meta_service_test.cpp @@ -216,6 +216,35 @@ static void update_tmp_rowset(MetaServiceProxy* meta_service, if (!arena) delete req; } +static void get_delete_bitmap_update_lock(MetaServiceProxy* meta_service, + GetDeleteBitmapUpdateLockResponse& res, int64_t db_id, + int64_t table_id, int64_t index_id, + const std::vector>& tablet_idxes, + int64_t expiration, int64_t lock_id, int64_t initiator, + bool require_compaction_stats) { + brpc::Controller cntl; + GetDeleteBitmapUpdateLockRequest req; + req.set_cloud_unique_id("test_cloud_unique_id"); + req.set_table_id(table_id); + for (const auto& [partition_id, _] : tablet_idxes) { + req.add_partition_ids(partition_id); + } + req.set_expiration(expiration); + req.set_lock_id(lock_id); + req.set_initiator(initiator); + req.set_require_compaction_stats(require_compaction_stats); + for (const auto& [partition_id, tablet_id] : tablet_idxes) { + TabletIndexPB* idx = req.add_tablet_indexes(); + idx->set_db_id(db_id); + idx->set_table_id(table_id); + idx->set_index_id(index_id); + idx->set_partition_id(partition_id); + idx->set_tablet_id(tablet_id); + } + meta_service->get_delete_bitmap_update_lock( + reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, nullptr); +} + static void insert_rowset(MetaServiceProxy* meta_service, int64_t db_id, const std::string& label, int64_t table_id, int64_t partition_id, int64_t tablet_id) { int64_t txn_id = 0; @@ -230,6 +259,26 @@ static void insert_rowset(MetaServiceProxy* meta_service, int64_t db_id, const s commit_txn(meta_service, db_id, txn_id, label); } +static void add_tablet_stats(MetaServiceProxy* meta_service, std::string instance_id, + int64_t table_id, int64_t index_id, + const std::vector>& tablet_idxes) { + std::unique_ptr txn; + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + + for (const auto& idx : tablet_idxes) { + int64_t partition_id = idx[0]; + int64_t tablet_id = idx[1]; + std::string stats_key = + stats_tablet_key({instance_id, table_id, index_id, partition_id, tablet_id}); + TabletStatsPB stats; + stats.set_base_compaction_cnt(10); + stats.set_cumulative_compaction_cnt(20); + stats.set_cumulative_point(30); + txn->put(stats_key, stats.SerializeAsString()); + } + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); +} + TEST(MetaServiceTest, GetInstanceIdTest) { extern std::string get_instance_id(const std::shared_ptr& rc_mgr, const std::string& cloud_unique_id); @@ -4488,7 +4537,7 @@ TEST(MetaServiceTest, GetTabletStatsTest) { EXPECT_EQ(res.tablet_stats(0).segment_size(), 40000); } -TEST(MetaServiceTest, GetDeleteBitmapUpdateLock) { +TEST(MetaServiceTest, GetDeleteBitmapUpdateLockNoReadStats) { auto meta_service = get_meta_service(); brpc::Controller cntl; @@ -4530,6 +4579,231 @@ TEST(MetaServiceTest, GetDeleteBitmapUpdateLock) { ASSERT_EQ(res.status().code(), MetaServiceCode::OK); } +TEST(MetaServiceTest, GetDeleteBitmapUpdateLockTabletStatsNormal) { + auto meta_service = get_meta_service(); + + std::string instance_id = "test_get_delete_bitmap_update_lock_normal"; + [[maybe_unused]] auto* sp = SyncPoint::get_instance(); + std::unique_ptr> defer((int*)0x01, [](int*) { + SyncPoint::get_instance()->disable_processing(); + SyncPoint::get_instance()->clear_all_call_backs(); + }); + sp->set_call_back("get_instance_id", [&](auto&& args) { + auto* ret = try_any_cast_ret(args); + ret->first = instance_id; + ret->second = true; + }); + sp->enable_processing(); + + int64_t db_id = 1000; + int64_t table_id = 2001; + int64_t index_id = 3001; + // [(partition_id, tablet_id)] + std::vector> tablet_idxes {{70001, 12345}, {80001, 3456}, {90001, 6789}}; + + add_tablet_stats(meta_service.get(), instance_id, table_id, index_id, tablet_idxes); + + GetDeleteBitmapUpdateLockResponse res; + get_delete_bitmap_update_lock(meta_service.get(), res, db_id, table_id, index_id, tablet_idxes, + 5, 999999, -1, true); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + + ASSERT_EQ(res.base_compaction_cnts().size(), tablet_idxes.size()); + for (const auto& base_compaction_cnt : res.base_compaction_cnts()) { + ASSERT_EQ(base_compaction_cnt, 10); + } + ASSERT_EQ(res.cumulative_compaction_cnts().size(), tablet_idxes.size()); + for (const auto& cumu_compaction_cnt : res.cumulative_compaction_cnts()) { + ASSERT_EQ(cumu_compaction_cnt, 20); + } + ASSERT_EQ(res.cumulative_points().size(), tablet_idxes.size()); + for (const auto& cumulative_point : res.cumulative_points()) { + ASSERT_EQ(cumulative_point, 30); + } +} + +TEST(MetaServiceTest, GetDeleteBitmapUpdateLockTabletStatsLockExpired) { + auto meta_service = get_meta_service(); + + { + // 2.1 abnormal path, lock has been expired and taken by another load/compaction during + // the reading of tablet stats + std::string instance_id = "test_get_delete_bitmap_update_lock_abnormal1"; + [[maybe_unused]] auto* sp = SyncPoint::get_instance(); + std::unique_ptr> defer((int*)0x01, [](int*) { + SyncPoint::get_instance()->disable_processing(); + SyncPoint::get_instance()->clear_all_call_backs(); + }); + sp->set_call_back("get_instance_id", [&](auto&& args) { + auto* ret = try_any_cast_ret(args); + ret->first = instance_id; + ret->second = true; + }); + sp->set_call_back("check_delete_bitmap_lock.set_lock_info", [&](auto&& args) { + auto* lock_info = try_any_cast(args[0]); + // simulate that lock_id has been modified by another load + lock_info->set_lock_id(345); + LOG(INFO) << "change lock_info.lock_id to 345, lock_info=" << lock_info->DebugString(); + }); + + sp->enable_processing(); + + int64_t db_id = 1000; + int64_t table_id = 2001; + int64_t index_id = 3001; + // [(partition_id, tablet_id)] + std::vector> tablet_idxes { + {70001, 12345}, {80001, 3456}, {90001, 6789}}; + + add_tablet_stats(meta_service.get(), instance_id, table_id, index_id, tablet_idxes); + + GetDeleteBitmapUpdateLockResponse res; + get_delete_bitmap_update_lock(meta_service.get(), res, db_id, table_id, index_id, + tablet_idxes, 5, 999999, -1, true); + ASSERT_EQ(res.status().code(), MetaServiceCode::LOCK_EXPIRED); + ASSERT_EQ(res.base_compaction_cnts().size(), 0); + ASSERT_EQ(res.cumulative_compaction_cnts().size(), 0); + ASSERT_EQ(res.cumulative_points().size(), 0); + } + + { + // 2.2 abnormal path, lock has been taken by another load/compaction and been released during + // the reading of tablet stats + std::string instance_id = "test_get_delete_bitmap_update_lock_abnormal2"; + [[maybe_unused]] auto* sp = SyncPoint::get_instance(); + std::unique_ptr> defer((int*)0x01, [](int*) { + SyncPoint::get_instance()->disable_processing(); + SyncPoint::get_instance()->clear_all_call_backs(); + }); + sp->set_call_back("get_instance_id", [&](auto&& args) { + auto* ret = try_any_cast_ret(args); + ret->first = instance_id; + ret->second = true; + }); + sp->set_call_back("check_delete_bitmap_lock.inject_get_lock_key_err", [&](auto&& args) { + auto* err = try_any_cast(args[0]); + // the lock has been taken by another load and been released, + // so the delete bitmap update lock KV will be removed + *err = TxnErrorCode::TXN_KEY_NOT_FOUND; + }); + + sp->enable_processing(); + + int64_t db_id = 1000; + int64_t table_id = 2001; + int64_t index_id = 3001; + // [(partition_id, tablet_id)] + std::vector> tablet_idxes { + {70001, 12345}, {80001, 3456}, {90001, 6789}}; + + add_tablet_stats(meta_service.get(), instance_id, table_id, index_id, tablet_idxes); + + GetDeleteBitmapUpdateLockResponse res; + get_delete_bitmap_update_lock(meta_service.get(), res, db_id, table_id, index_id, + tablet_idxes, 5, 999999, -1, true); + ASSERT_EQ(res.status().code(), MetaServiceCode::LOCK_EXPIRED); + } +} + +TEST(MetaServiceTest, GetDeleteBitmapUpdateLockTabletStatsError) { + auto meta_service = get_meta_service(); + + { + // 2.3 abnormal path, meeting error when reading tablets' stats + std::string instance_id = "test_get_delete_bitmap_update_lock_abnormal3"; + [[maybe_unused]] auto* sp = SyncPoint::get_instance(); + std::unique_ptr> defer((int*)0x01, [](int*) { + SyncPoint::get_instance()->disable_processing(); + SyncPoint::get_instance()->clear_all_call_backs(); + }); + sp->set_call_back("get_instance_id", [&](auto&& args) { + auto* ret = try_any_cast_ret(args); + ret->first = instance_id; + ret->second = true; + }); + + TxnErrorCode injected_error_code {TxnErrorCode::TXN_KEY_NOT_FOUND}; + sp->set_call_back("get_delete_bitmap_update_lock.get_compaction_cnts_inject_error", + [&](auto&& args) { + auto* err = try_any_cast(args[0]); + *err = injected_error_code; + }); + + sp->enable_processing(); + + int64_t db_id = 1000; + int64_t table_id = 2001; + int64_t index_id = 3001; + // [(partition_id, tablet_id)] + std::vector> tablet_idxes { + {70001, 12345}, {80001, 3456}, {90001, 6789}}; + + add_tablet_stats(meta_service.get(), instance_id, table_id, index_id, tablet_idxes); + + GetDeleteBitmapUpdateLockResponse res; + get_delete_bitmap_update_lock(meta_service.get(), res, db_id, table_id, index_id, + tablet_idxes, 5, 999999, -1, true); + ASSERT_EQ(res.status().code(), MetaServiceCode::KV_TXN_GET_ERR); + } + + { + // 2.4 abnormal path, meeting TXN_TOO_OLD error when reading tablets' stats, + // this should not fail if lock is not expired + std::string instance_id = "test_get_delete_bitmap_update_lock_abnormal4"; + [[maybe_unused]] auto* sp = SyncPoint::get_instance(); + std::unique_ptr> defer((int*)0x01, [](int*) { + SyncPoint::get_instance()->disable_processing(); + SyncPoint::get_instance()->clear_all_call_backs(); + }); + sp->set_call_back("get_instance_id", [&](auto&& args) { + auto* ret = try_any_cast_ret(args); + ret->first = instance_id; + ret->second = true; + }); + + int counter = 0; + sp->set_call_back("get_delete_bitmap_update_lock.get_compaction_cnts_inject_error", + [&](auto&& args) { + if (counter++ % 2 == 0) { + auto* err = try_any_cast(args[0]); + *err = TxnErrorCode::TXN_TOO_OLD; + } + }); + + sp->enable_processing(); + + int64_t db_id = 1000; + int64_t table_id = 2001; + int64_t index_id = 3001; + // [(partition_id, tablet_id)] + std::vector> tablet_idxes; + for (int i = 0; i < 20; i++) { + int64_t partition_id = 70000 + i; + int64_t tablet_id = 80000 + i; + tablet_idxes.push_back({partition_id, tablet_id}); + } + + add_tablet_stats(meta_service.get(), instance_id, table_id, index_id, tablet_idxes); + + GetDeleteBitmapUpdateLockResponse res; + get_delete_bitmap_update_lock(meta_service.get(), res, db_id, table_id, index_id, + tablet_idxes, 5, 999999, -1, true); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + ASSERT_EQ(res.base_compaction_cnts().size(), tablet_idxes.size()); + for (const auto& base_compaction_cnt : res.base_compaction_cnts()) { + ASSERT_EQ(base_compaction_cnt, 10); + } + ASSERT_EQ(res.cumulative_compaction_cnts().size(), tablet_idxes.size()); + for (const auto& cumu_compaction_cnt : res.cumulative_compaction_cnts()) { + ASSERT_EQ(cumu_compaction_cnt, 20); + } + ASSERT_EQ(res.cumulative_points().size(), tablet_idxes.size()); + for (const auto& cumulative_point : res.cumulative_points()) { + ASSERT_EQ(cumulative_point, 30); + } + } +} + static std::string generate_random_string(int length) { std::string char_set = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; std::random_device rd; diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index 97d75403ffcb6e..11a3f05ead70c4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -922,14 +922,20 @@ private void getDeleteBitmapUpdateLock(Map> tableToParttions, lo List respBaseCompactionCnts = response.getBaseCompactionCntsList(); List respCumulativeCompactionCnts = response.getCumulativeCompactionCntsList(); List respCumulativePoints = response.getCumulativePointsList(); - if (!respBaseCompactionCnts.isEmpty() && !respCumulativeCompactionCnts.isEmpty() - && !respCumulativePoints.isEmpty()) { - for (int i = 0; i < tabletList.size(); i++) { - long tabletId = tabletList.get(i); - baseCompactionCnts.put(tabletId, respBaseCompactionCnts.get(i)); - cumulativeCompactionCnts.put(tabletId, respCumulativeCompactionCnts.get(i)); - cumulativePoints.put(tabletId, respCumulativePoints.get(i)); - } + int size1 = respBaseCompactionCnts.size(); + int size2 = respCumulativeCompactionCnts.size(); + int size3 = respCumulativePoints.size(); + if (size1 != tabletList.size() || size2 != tabletList.size() || size3 != tabletList.size()) { + throw new UserException("The size of returned compaction cnts can't match the size of tabletList, " + + "tabletList.size()=" + tabletList.size() + ", respBaseCompactionCnts.size()=" + size1 + + ", respCumulativeCompactionCnts.size()=" + size2 + ", respCumulativePoints.size()=" + + size3); + } + for (int i = 0; i < tabletList.size(); i++) { + long tabletId = tabletList.get(i); + baseCompactionCnts.put(tabletId, respBaseCompactionCnts.get(i)); + cumulativeCompactionCnts.put(tabletId, respCumulativeCompactionCnts.get(i)); + cumulativePoints.put(tabletId, respCumulativePoints.get(i)); } totalRetryTime += retryTime; }