Skip to content

Commit

Permalink
tests: Fix unstable schema test (#8972)
Browse files Browse the repository at this point in the history
close #8911, close #8968
  • Loading branch information
JaySon-Huang authored Apr 22, 2024
1 parent 2c0db29 commit 4412105
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 72 deletions.
29 changes: 20 additions & 9 deletions dbms/src/TiDB/Schema/SchemaSyncService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,26 @@ SchemaSyncService::SchemaSyncService(DB::Context & context_)
, log(Logger::get())
{
// Add task for adding and removing keyspace sync schema tasks.
handle = background_pool.addTask(
[&, this] {
addKeyspaceGCTasks();
removeKeyspaceGCTasks();

return false;
},
false,
context.getSettingsRef().ddl_sync_interval_seconds * 1000);
auto interval_ms = context.getSettingsRef().ddl_sync_interval_seconds * 1000;
if (interval_ms == 0)
{
LOG_WARNING(
log,
"The background task of SchemaSyncService is disabled, please check the ddl_sync_interval_seconds "
"settings");
}
else
{
handle = background_pool.addTask(
[&, this] {
addKeyspaceGCTasks();
removeKeyspaceGCTasks();

return false;
},
false,
interval_ms);
}
}

void SchemaSyncService::addKeyspaceGCTasks()
Expand Down
96 changes: 50 additions & 46 deletions dbms/src/TiDB/Schema/TiDBSchemaManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,31 +29,6 @@ class TiDBSchemaSyncerManager
, log(Logger::get("TiDBSchemaSyncerManager"))
{}

SchemaSyncerPtr createSchemaSyncer(KeyspaceID keyspace_id)
{
if (!mock_getter and !mock_mapper)
{
auto schema_syncer = std::static_pointer_cast<SchemaSyncer>(
std::make_shared<TiDBSchemaSyncer<false, false>>(cluster, keyspace_id));
schema_syncers[keyspace_id] = schema_syncer;
return schema_syncer;
}
else if (mock_getter and mock_mapper)
{
// for mock test
auto schema_syncer = std::static_pointer_cast<SchemaSyncer>(
std::make_shared<TiDBSchemaSyncer<true, true>>(cluster, keyspace_id));
schema_syncers[keyspace_id] = schema_syncer;
return schema_syncer;
}

// for unit test
auto schema_syncer = std::static_pointer_cast<SchemaSyncer>(
std::make_shared<TiDBSchemaSyncer<true, false>>(cluster, keyspace_id));
schema_syncers[keyspace_id] = schema_syncer;
return schema_syncer;
}

bool syncSchemas(Context & context, KeyspaceID keyspace_id)
{
auto schema_syncer = getOrCreateSchemaSyncer(keyspace_id);
Expand All @@ -68,8 +43,8 @@ class TiDBSchemaSyncerManager

void reset(KeyspaceID keyspace_id)
{
std::shared_lock<std::shared_mutex> read_lock(schema_syncers_mutex);
auto schema_syncer = getSchemaSyncer(keyspace_id);
std::shared_lock read_lock(schema_syncers_mutex);
auto schema_syncer = getSchemaSyncer(keyspace_id, read_lock);
if (schema_syncer == nullptr)
{
LOG_ERROR(log, "SchemaSyncer not found, keyspace={}", keyspace_id);
Expand All @@ -80,8 +55,8 @@ class TiDBSchemaSyncerManager

TiDB::DBInfoPtr getDBInfoByName(KeyspaceID keyspace_id, const String & database_name)
{
std::shared_lock<std::shared_mutex> read_lock(schema_syncers_mutex);
auto schema_syncer = getSchemaSyncer(keyspace_id);
std::shared_lock read_lock(schema_syncers_mutex);
auto schema_syncer = getSchemaSyncer(keyspace_id, read_lock);
if (schema_syncer == nullptr)
{
LOG_ERROR(log, "SchemaSyncer not found, keyspace={}", keyspace_id);
Expand All @@ -92,8 +67,8 @@ class TiDBSchemaSyncerManager

bool removeSchemaSyncer(KeyspaceID keyspace_id)
{
std::unique_lock<std::shared_mutex> lock(schema_syncers_mutex);
auto schema_syncer = getSchemaSyncer(keyspace_id);
std::unique_lock lock(schema_syncers_mutex);
auto schema_syncer = getSchemaSyncer(keyspace_id, lock);
if (schema_syncer == nullptr)
{
LOG_ERROR(log, "SchemaSyncer not found, keyspace={}", keyspace_id);
Expand All @@ -105,8 +80,8 @@ class TiDBSchemaSyncerManager

void removeTableID(KeyspaceID keyspace_id, TableID table_id)
{
std::shared_lock<std::shared_mutex> read_lock(schema_syncers_mutex);
auto schema_syncer = getSchemaSyncer(keyspace_id);
std::shared_lock read_lock(schema_syncers_mutex);
auto schema_syncer = getSchemaSyncer(keyspace_id, read_lock);
if (schema_syncer == nullptr)
{
LOG_ERROR(log, "SchemaSyncer not found, keyspace={}", keyspace_id);
Expand All @@ -126,30 +101,59 @@ class TiDBSchemaSyncerManager

std::unordered_map<KeyspaceID, SchemaSyncerPtr> schema_syncers;

/// the function is not thread safe, should be called with a lock
SchemaSyncerPtr getSchemaSyncer(KeyspaceID keyspace_id)
private:
/// Try to get the SchemaSyncer for the `keyspace_id`. Returns nullptr
/// if there is not exist.
/// Note: the function is not thread safe, should be called with a lock
template <typename Lock>
SchemaSyncerPtr getSchemaSyncer(KeyspaceID keyspace_id, Lock & /*lock*/)
{
auto syncer = schema_syncers.find(keyspace_id);
return syncer == schema_syncers.end() ? nullptr : syncer->second;
}

/// Try to get the SchemaSyncer for the `keyspace_id`. Create a SchemaSyncer
/// if there is not exist.
SchemaSyncerPtr getOrCreateSchemaSyncer(KeyspaceID keyspace_id)
{
std::shared_lock<std::shared_mutex> read_lock(schema_syncers_mutex);
auto syncer = schema_syncers.find(keyspace_id);
if (syncer == schema_syncers.end())
{
read_lock.unlock();
std::unique_lock<std::shared_mutex> write_lock(schema_syncers_mutex);

syncer = schema_syncers.find(keyspace_id);
if (syncer == schema_syncers.end())
std::shared_lock read_lock(schema_syncers_mutex);
if (auto iter = schema_syncers.find(keyspace_id); iter != schema_syncers.end())
{
return createSchemaSyncer(keyspace_id);
return iter->second;
}
return syncer->second;
}
return syncer->second;

// release the read_lock and acquire a write_lock
std::unique_lock write_lock(schema_syncers_mutex);
// check again whether other thread has created for the keyspace_id
// after `write_lock` acquired
if (auto iter = schema_syncers.find(keyspace_id); iter != schema_syncers.end())
{
return iter->second;
}
auto syncer = createSchemaSyncer(keyspace_id);
schema_syncers[keyspace_id] = syncer; // register to the syncers
return syncer;
}

SchemaSyncerPtr createSchemaSyncer(KeyspaceID keyspace_id)
{
if (!mock_getter && !mock_mapper)
{
return std::static_pointer_cast<SchemaSyncer>(
std::make_shared<TiDBSchemaSyncer<false, false>>(cluster, keyspace_id));
}
else if (mock_getter && mock_mapper)
{
// for mock test
return std::static_pointer_cast<SchemaSyncer>(
std::make_shared<TiDBSchemaSyncer<true, true>>(cluster, keyspace_id));
}

// for unit test
return std::static_pointer_cast<SchemaSyncer>(
std::make_shared<TiDBSchemaSyncer<true, false>>(cluster, keyspace_id));
}
};
} // namespace DB
42 changes: 25 additions & 17 deletions dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ class SchemaSyncTest : public ::testing::Test

void SetUp() override
{
// unit test.
// Get DBInfo/TableInfo from MockTiDB, but create table with names `t_${table_id}`
auto cluster = std::make_shared<pingcap::kv::Cluster>();
schema_sync_manager = std::make_unique<TiDBSchemaSyncerManager>(
cluster,
/*mock_getter*/ true,
/*mock_mapper*/ false);

// disable schema sync timer
global_ctx.getSchemaSyncService().reset();
recreateMetadataPath();
Expand All @@ -97,11 +105,9 @@ class SchemaSyncTest : public ::testing::Test
// Sync schema info from TiDB/MockTiDB to TiFlash
void refreshSchema()
{
auto & flash_ctx = global_ctx.getTMTContext();
auto schema_syncer = flash_ctx.getSchemaSyncerManager();
try
{
schema_syncer->syncSchemas(global_ctx, NullspaceID);
schema_sync_manager->syncSchemas(global_ctx, NullspaceID);
}
catch (Exception & e)
{
Expand All @@ -118,11 +124,9 @@ class SchemaSyncTest : public ::testing::Test

void refreshTableSchema(TableID table_id)
{
auto & flash_ctx = global_ctx.getTMTContext();
auto schema_syncer = flash_ctx.getSchemaSyncerManager();
try
{
schema_syncer->syncTableSchema(global_ctx, NullspaceID, table_id);
schema_sync_manager->syncTableSchema(global_ctx, NullspaceID, table_id);
}
catch (Exception & e)
{
Expand All @@ -138,11 +142,7 @@ class SchemaSyncTest : public ::testing::Test
}

// Reset the schema syncer to mock TiFlash shutdown
void resetSchemas()
{
auto & flash_ctx = global_ctx.getTMTContext();
flash_ctx.getSchemaSyncerManager()->reset(NullspaceID);
}
void resetSchemas() { schema_sync_manager->reset(NullspaceID); }

// Get the TiFlash synced table
ManageableStoragePtr mustGetSyncedTable(TableID table_id)
Expand Down Expand Up @@ -207,6 +207,8 @@ class SchemaSyncTest : public ::testing::Test

protected:
Context & global_ctx;

std::unique_ptr<TiDBSchemaSyncerManager> schema_sync_manager;
};

TEST_F(SchemaSyncTest, SchemaDiff)
Expand Down Expand Up @@ -300,7 +302,12 @@ try
refreshTableSchema(table_id);
}

auto sync_service = std::make_shared<SchemaSyncService>(global_ctx);
// Create a temporary context with ddl sync task disabled
auto ctx = DB::tests::TiFlashTestEnv::getContext();
ctx->getSettingsRef().ddl_sync_interval_seconds = 0;
auto sync_service = std::make_shared<SchemaSyncService>(*ctx);
sync_service->shutdown(); // shutdown the background tasks

// run gc with safepoint == 0, will be skip
ASSERT_FALSE(sync_service->gc(0, NullspaceID));
ASSERT_TRUE(sync_service->gc(10000000, NullspaceID));
Expand All @@ -312,8 +319,6 @@ try
ASSERT_TRUE(sync_service->gc(20000000, 1024));
// run gc with the same safepoint
ASSERT_FALSE(sync_service->gc(20000000, 1024));

sync_service->shutdown();
}
CATCH

Expand Down Expand Up @@ -357,7 +362,12 @@ try
std::vector<RegionID>{1001, 1002, 1003});
SCOPE_EXIT({ FailPointHelper::disableFailPoint(FailPoints::force_set_num_regions_for_table); });

auto sync_service = std::make_shared<SchemaSyncService>(global_ctx);
// Create a temporary context with ddl sync task disabled
auto ctx = DB::tests::TiFlashTestEnv::getContext();
ctx->getSettingsRef().ddl_sync_interval_seconds = 0;
auto sync_service = std::make_shared<SchemaSyncService>(*ctx);
sync_service->shutdown(); // shutdown the background tasks

{
// ensure gc_safe_point cache is empty
auto last_gc_safe_point = lastGcSafePoint(sync_service, NullspaceID);
Expand All @@ -381,8 +391,6 @@ try
++num_remain_tables;
}
ASSERT_EQ(num_remain_tables, 1);

sync_service->shutdown();
}
CATCH

Expand Down

0 comments on commit 4412105

Please sign in to comment.