Skip to content

Commit

Permalink
[BugFix] fix lock release issue in cloud native pk table (#53878)
Browse files Browse the repository at this point in the history
Signed-off-by: luohaha <[email protected]>
(cherry picked from commit 35043dc)
  • Loading branch information
luohaha authored and mergify[bot] committed Dec 12, 2024
1 parent 41ddaf2 commit 5483133
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 0 deletions.
5 changes: 5 additions & 0 deletions be/src/storage/lake/update_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ StatusOr<IndexEntry*> UpdateManager::prepare_primary_index(
StarRocksMetrics::instance()->primary_key_table_error_state_total.increment(1);
builder->set_recover_flag(RecoverFlag::RECOVER_WITH_PUBLISH);
}
// If load failed, release lock guard and remove index entry
// MUST release lock guard before remove index entry
guard.reset(nullptr);
_index_cache.remove(index_entry);
std::string msg = strings::Substitute("prepare_primary_index: load primary index failed: $0", st.to_string());
LOG(ERROR) << msg;
Expand All @@ -156,6 +159,8 @@ StatusOr<IndexEntry*> UpdateManager::prepare_primary_index(
_block_cache->update_memory_usage();
st = index.prepare(EditVersion(new_version, 0), 0);
if (!st.ok()) {
// If prepare failed, release lock guard and remove index entry
guard.reset(nullptr);
_index_cache.remove(index_entry);
std::string msg =
strings::Substitute("prepare_primary_index: prepare primary index failed: $0", st.to_string());
Expand Down
49 changes: 49 additions & 0 deletions be/test/storage/lake/primary_key_publish_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -904,6 +904,55 @@ TEST_P(LakePrimaryKeyPublishTest, test_recover_with_dels2) {
config::enable_primary_key_recover = false;
}

TEST_P(LakePrimaryKeyPublishTest, test_index_load_failure) {
auto [chunk0, indexes] = gen_data_and_index(kChunkSize, 0, true, true);
auto version = 1;
auto tablet_id = _tablet_metadata->id();
bool ingest_failure = true;
std::string sync_point = "lake_index_load.1";
SyncPoint::GetInstance()->SetCallBack(sync_point, [&](void* arg) {
if (ingest_failure) {
*(Status*)arg = Status::AlreadyExist("ut_test");
ingest_failure = false;
} else {
ingest_failure = true;
}
});
SyncPoint::GetInstance()->EnableProcessing();
for (int i = 0; i < 6; i++) {
int64_t txn_id = next_id();
ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder()
.set_tablet_manager(_tablet_mgr.get())
.set_tablet_id(tablet_id)
.set_txn_id(txn_id)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_schema_id(_tablet_schema->id())
.set_slot_descriptors(&_slot_pointers)
.build());
ASSERT_OK(delta_writer->open());
// upsert
ASSERT_OK(delta_writer->write(*chunk0, indexes.data(), indexes.size()));
ASSERT_OK(delta_writer->finish_with_txnlog());
delta_writer->close();
// Publish version
ASSERT_FALSE(publish_single_version(tablet_id, version + 1, txn_id).ok());
ASSERT_TRUE(publish_single_version(tablet_id, version + 1, txn_id).ok());
EXPECT_TRUE(_update_mgr->TEST_check_update_state_cache_absent(tablet_id, txn_id));
EXPECT_TRUE(_update_mgr->TEST_check_primary_index_cache_ref(tablet_id, 1));
EXPECT_TRUE(_update_mgr->try_remove_primary_index_cache(tablet_id));
version++;
}
SyncPoint::GetInstance()->ClearCallBack(sync_point);
SyncPoint::GetInstance()->DisableProcessing();
ASSIGN_OR_ABORT(auto new_tablet_metadata, _tablet_mgr->get_tablet_metadata(tablet_id, version));
EXPECT_EQ(new_tablet_metadata->rowsets_size(), 6);
ASSERT_EQ(kChunkSize, read_rows(tablet_id, version));
if (GetParam().enable_persistent_index && GetParam().persistent_index_type == PersistentIndexTypePB::LOCAL) {
check_local_persistent_index_meta(tablet_id, version);
}
}

TEST_P(LakePrimaryKeyPublishTest, test_write_rebuild_persistent_index) {
if (!GetParam().enable_persistent_index) {
// only test persistent index
Expand Down

0 comments on commit 5483133

Please sign in to comment.