Skip to content

Commit

Permalink
KVStore: Reduce lock contention in RegionPersister::doPersist (rele…
Browse files Browse the repository at this point in the history
…ase-6.5) (#8593)

close #8583
  • Loading branch information
JaySon-Huang authored Dec 26, 2023
1 parent 26e400d commit 1226495
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 5 deletions.
6 changes: 4 additions & 2 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ namespace DB
M(pause_before_apply_raft_snapshot) \
M(pause_until_apply_raft_snapshot) \
M(pause_after_copr_streams_acquired_once) \
M(pause_before_register_non_root_mpp_task)
M(pause_before_register_non_root_mpp_task) \
M(pause_when_persist_region)

#define APPLY_FOR_PAUSEABLE_FAILPOINTS(M) \
M(pause_when_reading_from_dt_stream) \
Expand All @@ -131,7 +132,8 @@ namespace DB
M(random_sharedquery_failpoint) \
M(random_interpreter_failpoint) \
M(random_task_manager_find_task_failure_failpoint) \
M(random_min_tso_scheduler_failpoint)
M(random_min_tso_scheduler_failpoint) \
M(random_region_persister_latency_failpoint)

namespace FailPoints
{
Expand Down
28 changes: 26 additions & 2 deletions dbms/src/Storages/Transaction/RegionPersister.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <Common/Exception.h>
#include <Common/FailPoint.h>
#include <Common/SyncPoint/SyncPoint.h>
#include <IO/MemoryReadWriteBuffer.h>
#include <Interpreters/Context.h>
#include <Interpreters/Settings.h>
Expand Down Expand Up @@ -46,6 +47,8 @@ namespace FailPoints
{
extern const char force_enable_region_persister_compatible_mode[];
extern const char force_disable_region_persister_compatible_mode[];
extern const char pause_when_persist_region[];
extern const char random_region_persister_latency_failpoint[];
} // namespace FailPoints

void RegionPersister::drop(RegionID region_id, const RegionTaskLock &)
Expand Down Expand Up @@ -107,8 +110,6 @@ void RegionPersister::doPersist(RegionCacheWriteElement & region_write_buffer, c
{
auto & [region_id, buffer, region_size, applied_index] = region_write_buffer;

std::lock_guard lock(mutex);

if (page_reader)
{
auto entry = page_reader->getPageEntry(region_id);
Expand Down Expand Up @@ -142,6 +143,29 @@ void RegionPersister::doPersist(RegionCacheWriteElement & region_write_buffer, c
wb.putPage(region_id, applied_index, read_buf, region_size);
stable_page_storage->write(std::move(wb));
}

#ifdef FIU_ENABLE
fiu_do_on(FailPoints::pause_when_persist_region, {
if (auto v = FailPointHelper::getFailPointVal(FailPoints::pause_when_persist_region); v)
{
// Only pause for the given region_id
auto pause_region_id = std::any_cast<RegionID>(v.value());
if (region_id == pause_region_id)
{
SYNC_FOR("before_RegionPersister::persist_write_done");
}
}
else
{
// Pause for all persisting requests
SYNC_FOR("before_RegionPersister::persist_write_done");
}
});
fiu_do_on(FailPoints::random_region_persister_latency_failpoint, {
using namespace std::chrono_literals;
std::this_thread::sleep_for(1ms);
});
#endif
}

RegionPersister::RegionPersister(Context & global_context_, const RegionManager & region_manager_)
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Storages/Transaction/RegionPersister.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ class RegionPersister final : private boost::noncopyable

NamespaceId ns_id = KVSTORE_NAMESPACE_ID;
const RegionManager & region_manager;
std::mutex mutex;
LoggerPtr log;
};
} // namespace DB
73 changes: 73 additions & 0 deletions dbms/src/Storages/Transaction/tests/gtest_region_persister.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <Common/FailPoint.h>
#include <Common/Stopwatch.h>
#include <Common/SyncPoint/Ctl.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteBufferFromFile.h>
#include <RaftStoreProxyFFI/ColumnFamily.h>
Expand All @@ -29,13 +30,16 @@
#include <common/logger_useful.h>

#include <ext/scope_guard.h>
#include <future>

namespace DB
{
namespace FailPoints
{
extern const char force_enable_region_persister_compatible_mode[];
extern const char force_disable_region_persister_compatible_mode[];
extern const char force_region_persist_version[];
extern const char pause_when_persist_region[];
} // namespace FailPoints

namespace tests
Expand Down Expand Up @@ -239,6 +243,75 @@ class RegionPersisterTest : public ::testing::Test
LoggerPtr log;
};

TEST_F(RegionPersisterTest, Concurrency)
try
{
RegionManager region_manager;

auto ctx = TiFlashTestEnv::getGlobalContext();

RegionMap regions;
const TableID table_id = 100;

PageStorageConfig config;
config.file_roll_size = 128 * MB;

UInt64 diff = 0;
RegionPersister persister(ctx, region_manager);
persister.restore(*mocked_path_pool, nullptr, config);

// Persist region by region
const RegionID region_100 = 100;
FailPointHelper::enableFailPoint(FailPoints::pause_when_persist_region, region_100);
SCOPE_EXIT({ FailPointHelper::disableFailPoint(FailPoints::pause_when_persist_region); });

auto sp_persist_region_100 = SyncPointCtl::enableInScope("before_RegionPersister::persist_write_done");
auto th_persist_region_100 = std::async([&]() {
auto region_task_lock = region_manager.genRegionTaskLock(region_100);

auto region = std::make_shared<Region>(createRegionMeta(region_100, table_id));
TiKVKey key = RecordKVFormat::genKey(table_id, region_100, diff++);
region->insert(ColumnFamilyType::Default, TiKVKey::copyFrom(key), TiKVValue("value1"));
region->insert(ColumnFamilyType::Write, TiKVKey::copyFrom(key), RecordKVFormat::encodeWriteCfValue('P', 0));
region->insert(
ColumnFamilyType::Lock,
TiKVKey::copyFrom(key),
RecordKVFormat::encodeLockCfValue('P', "", 0, 0));

persister.persist(*region, region_task_lock);

regions.emplace(region->id(), region);
});
LOG_INFO(log, "paused before persisting region 100");
sp_persist_region_100.waitAndPause();

LOG_INFO(log, "before persisting region 101");
const RegionID region_101 = 101;
{
auto region_task_lock = region_manager.genRegionTaskLock(region_101);

auto region = std::make_shared<Region>(createRegionMeta(region_101, table_id));
TiKVKey key = RecordKVFormat::genKey(table_id, region_101, diff++);
region->insert(ColumnFamilyType::Default, TiKVKey::copyFrom(key), TiKVValue("value1"));
region->insert(ColumnFamilyType::Write, TiKVKey::copyFrom(key), RecordKVFormat::encodeWriteCfValue('P', 0));
region->insert(
ColumnFamilyType::Lock,
TiKVKey::copyFrom(key),
RecordKVFormat::encodeLockCfValue('P', "", 0, 0));

persister.persist(*region, region_task_lock);

regions.emplace(region->id(), region);
}
LOG_INFO(log, "after persisting region 101");

sp_persist_region_100.next();
th_persist_region_100.get();

LOG_INFO(log, "finished");
}
CATCH

TEST_F(RegionPersisterTest, persister)
try
{
Expand Down

0 comments on commit 1226495

Please sign in to comment.