Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests: Fix unstable schema test #8972

Merged
merged 4 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 20 additions & 9 deletions dbms/src/TiDB/Schema/SchemaSyncService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,26 @@ SchemaSyncService::SchemaSyncService(DB::Context & context_)
, log(Logger::get())
{
// Add task for adding and removing keyspace sync schema tasks.
handle = background_pool.addTask(
[&, this] {
addKeyspaceGCTasks();
removeKeyspaceGCTasks();

return false;
},
false,
context.getSettingsRef().ddl_sync_interval_seconds * 1000);
auto interval_ms = context.getSettingsRef().ddl_sync_interval_seconds * 1000;
if (interval_ms == 0)
{
LOG_WARNING(
log,
"The background task of SchemaSyncService is disabled, please check the ddl_sync_interval_seconds "
"settings");
}
else
{
handle = background_pool.addTask(
[&, this] {
addKeyspaceGCTasks();
removeKeyspaceGCTasks();

return false;
},
false,
interval_ms);
}
}

void SchemaSyncService::addKeyspaceGCTasks()
Expand Down
96 changes: 50 additions & 46 deletions dbms/src/TiDB/Schema/TiDBSchemaManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,31 +29,6 @@ class TiDBSchemaSyncerManager
, log(Logger::get("TiDBSchemaSyncerManager"))
{}

SchemaSyncerPtr createSchemaSyncer(KeyspaceID keyspace_id)
{
if (!mock_getter and !mock_mapper)
{
auto schema_syncer = std::static_pointer_cast<SchemaSyncer>(
std::make_shared<TiDBSchemaSyncer<false, false>>(cluster, keyspace_id));
schema_syncers[keyspace_id] = schema_syncer;
return schema_syncer;
}
else if (mock_getter and mock_mapper)
{
// for mock test
auto schema_syncer = std::static_pointer_cast<SchemaSyncer>(
std::make_shared<TiDBSchemaSyncer<true, true>>(cluster, keyspace_id));
schema_syncers[keyspace_id] = schema_syncer;
return schema_syncer;
}

// for unit test
auto schema_syncer = std::static_pointer_cast<SchemaSyncer>(
std::make_shared<TiDBSchemaSyncer<true, false>>(cluster, keyspace_id));
schema_syncers[keyspace_id] = schema_syncer;
return schema_syncer;
}

bool syncSchemas(Context & context, KeyspaceID keyspace_id)
{
auto schema_syncer = getOrCreateSchemaSyncer(keyspace_id);
Expand All @@ -68,8 +43,8 @@ class TiDBSchemaSyncerManager

void reset(KeyspaceID keyspace_id)
{
std::shared_lock<std::shared_mutex> read_lock(schema_syncers_mutex);
auto schema_syncer = getSchemaSyncer(keyspace_id);
std::shared_lock read_lock(schema_syncers_mutex);
auto schema_syncer = getSchemaSyncer(keyspace_id, read_lock);
if (schema_syncer == nullptr)
{
LOG_ERROR(log, "SchemaSyncer not found, keyspace={}", keyspace_id);
Expand All @@ -80,8 +55,8 @@ class TiDBSchemaSyncerManager

TiDB::DBInfoPtr getDBInfoByName(KeyspaceID keyspace_id, const String & database_name)
{
std::shared_lock<std::shared_mutex> read_lock(schema_syncers_mutex);
auto schema_syncer = getSchemaSyncer(keyspace_id);
std::shared_lock read_lock(schema_syncers_mutex);
auto schema_syncer = getSchemaSyncer(keyspace_id, read_lock);
if (schema_syncer == nullptr)
{
LOG_ERROR(log, "SchemaSyncer not found, keyspace={}", keyspace_id);
Expand All @@ -92,8 +67,8 @@ class TiDBSchemaSyncerManager

bool removeSchemaSyncer(KeyspaceID keyspace_id)
{
std::unique_lock<std::shared_mutex> lock(schema_syncers_mutex);
auto schema_syncer = getSchemaSyncer(keyspace_id);
std::unique_lock lock(schema_syncers_mutex);
auto schema_syncer = getSchemaSyncer(keyspace_id, lock);
if (schema_syncer == nullptr)
{
LOG_ERROR(log, "SchemaSyncer not found, keyspace={}", keyspace_id);
Expand All @@ -105,8 +80,8 @@ class TiDBSchemaSyncerManager

void removeTableID(KeyspaceID keyspace_id, TableID table_id)
{
std::shared_lock<std::shared_mutex> read_lock(schema_syncers_mutex);
auto schema_syncer = getSchemaSyncer(keyspace_id);
std::shared_lock read_lock(schema_syncers_mutex);
auto schema_syncer = getSchemaSyncer(keyspace_id, read_lock);
if (schema_syncer == nullptr)
{
LOG_ERROR(log, "SchemaSyncer not found, keyspace={}", keyspace_id);
Expand All @@ -126,30 +101,59 @@ class TiDBSchemaSyncerManager

std::unordered_map<KeyspaceID, SchemaSyncerPtr> schema_syncers;

/// the function is not thread safe, should be called with a lock
SchemaSyncerPtr getSchemaSyncer(KeyspaceID keyspace_id)
private:
/// Try to get the SchemaSyncer for the `keyspace_id`. Returns nullptr
/// if there is not exist.
/// Note: the function is not thread safe, should be called with a lock
template <typename Lock>
SchemaSyncerPtr getSchemaSyncer(KeyspaceID keyspace_id, Lock & /*lock*/)
{
auto syncer = schema_syncers.find(keyspace_id);
return syncer == schema_syncers.end() ? nullptr : syncer->second;
}

/// Try to get the SchemaSyncer for the `keyspace_id`. Create a SchemaSyncer
/// if there is not exist.
SchemaSyncerPtr getOrCreateSchemaSyncer(KeyspaceID keyspace_id)
{
std::shared_lock<std::shared_mutex> read_lock(schema_syncers_mutex);
auto syncer = schema_syncers.find(keyspace_id);
if (syncer == schema_syncers.end())
{
read_lock.unlock();
std::unique_lock<std::shared_mutex> write_lock(schema_syncers_mutex);

syncer = schema_syncers.find(keyspace_id);
if (syncer == schema_syncers.end())
std::shared_lock read_lock(schema_syncers_mutex);
if (auto iter = schema_syncers.find(keyspace_id); iter != schema_syncers.end())
{
return createSchemaSyncer(keyspace_id);
return iter->second;
}
return syncer->second;
}
return syncer->second;

// release the read_lock and acquire a write_lock
std::unique_lock write_lock(schema_syncers_mutex);
// check again whether other thread has created for the keyspace_id
// after `write_lock` acquired
if (auto iter = schema_syncers.find(keyspace_id); iter != schema_syncers.end())
{
return iter->second;
}
auto syncer = createSchemaSyncer(keyspace_id);
schema_syncers[keyspace_id] = syncer; // register to the syncers
return syncer;
}

SchemaSyncerPtr createSchemaSyncer(KeyspaceID keyspace_id)
{
if (!mock_getter && !mock_mapper)
{
return std::static_pointer_cast<SchemaSyncer>(
std::make_shared<TiDBSchemaSyncer<false, false>>(cluster, keyspace_id));
}
else if (mock_getter && mock_mapper)
{
// for mock test
return std::static_pointer_cast<SchemaSyncer>(
std::make_shared<TiDBSchemaSyncer<true, true>>(cluster, keyspace_id));
}

// for unit test
return std::static_pointer_cast<SchemaSyncer>(
std::make_shared<TiDBSchemaSyncer<true, false>>(cluster, keyspace_id));
}
};
} // namespace DB
42 changes: 25 additions & 17 deletions dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ class SchemaSyncTest : public ::testing::Test

void SetUp() override
{
// unit test.
// Get DBInfo/TableInfo from MockTiDB, but create table with names `t_${table_id}`
auto cluster = std::make_shared<pingcap::kv::Cluster>();
schema_sync_manager = std::make_unique<TiDBSchemaSyncerManager>(
cluster,
/*mock_getter*/ true,
/*mock_mapper*/ false);

// disable schema sync timer
global_ctx.getSchemaSyncService().reset();
recreateMetadataPath();
Expand All @@ -97,11 +105,9 @@ class SchemaSyncTest : public ::testing::Test
// Sync schema info from TiDB/MockTiDB to TiFlash
void refreshSchema()
{
auto & flash_ctx = global_ctx.getTMTContext();
auto schema_syncer = flash_ctx.getSchemaSyncerManager();
try
{
schema_syncer->syncSchemas(global_ctx, NullspaceID);
schema_sync_manager->syncSchemas(global_ctx, NullspaceID);
}
catch (Exception & e)
{
Expand All @@ -118,11 +124,9 @@ class SchemaSyncTest : public ::testing::Test

void refreshTableSchema(TableID table_id)
{
auto & flash_ctx = global_ctx.getTMTContext();
auto schema_syncer = flash_ctx.getSchemaSyncerManager();
try
{
schema_syncer->syncTableSchema(global_ctx, NullspaceID, table_id);
schema_sync_manager->syncTableSchema(global_ctx, NullspaceID, table_id);
}
catch (Exception & e)
{
Expand All @@ -138,11 +142,7 @@ class SchemaSyncTest : public ::testing::Test
}

// Reset the schema syncer to mock TiFlash shutdown
void resetSchemas()
{
auto & flash_ctx = global_ctx.getTMTContext();
flash_ctx.getSchemaSyncerManager()->reset(NullspaceID);
}
void resetSchemas() { schema_sync_manager->reset(NullspaceID); }

// Get the TiFlash synced table
ManageableStoragePtr mustGetSyncedTable(TableID table_id)
Expand Down Expand Up @@ -207,6 +207,8 @@ class SchemaSyncTest : public ::testing::Test

protected:
Context & global_ctx;

std::unique_ptr<TiDBSchemaSyncerManager> schema_sync_manager;
};

TEST_F(SchemaSyncTest, SchemaDiff)
Expand Down Expand Up @@ -300,7 +302,12 @@ try
refreshTableSchema(table_id);
}

auto sync_service = std::make_shared<SchemaSyncService>(global_ctx);
// Create a temporary context with ddl sync task disabled
auto ctx = DB::tests::TiFlashTestEnv::getContext();
ctx->getSettingsRef().ddl_sync_interval_seconds = 0;
auto sync_service = std::make_shared<SchemaSyncService>(*ctx);
sync_service->shutdown(); // shutdown the background tasks

// run gc with safepoint == 0, will be skip
ASSERT_FALSE(sync_service->gc(0, NullspaceID));
ASSERT_TRUE(sync_service->gc(10000000, NullspaceID));
Expand All @@ -312,8 +319,6 @@ try
ASSERT_TRUE(sync_service->gc(20000000, 1024));
// run gc with the same safepoint
ASSERT_FALSE(sync_service->gc(20000000, 1024));

sync_service->shutdown();
}
CATCH

Expand Down Expand Up @@ -357,7 +362,12 @@ try
std::vector<RegionID>{1001, 1002, 1003});
SCOPE_EXIT({ FailPointHelper::disableFailPoint(FailPoints::force_set_num_regions_for_table); });

auto sync_service = std::make_shared<SchemaSyncService>(global_ctx);
// Create a temporary context with ddl sync task disabled
auto ctx = DB::tests::TiFlashTestEnv::getContext();
ctx->getSettingsRef().ddl_sync_interval_seconds = 0;
auto sync_service = std::make_shared<SchemaSyncService>(*ctx);
sync_service->shutdown(); // shutdown the background tasks

{
// ensure gc_safe_point cache is empty
auto last_gc_safe_point = lastGcSafePoint(sync_service, NullspaceID);
Expand All @@ -381,8 +391,6 @@ try
++num_remain_tables;
}
ASSERT_EQ(num_remain_tables, 1);

sync_service->shutdown();
}
CATCH

Expand Down