Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schema: allow loading empty schema diff when the version grows up. #5245

Merged
merged 15 commits into from
Jul 4, 2022
13 changes: 11 additions & 2 deletions dbms/src/Debug/MockSchemaGetter.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,25 @@
#include <Debug/MockTiDB.h>
#include <TiDB/Schema/SchemaGetter.h>

#include <optional>

namespace DB
{

struct MockSchemaGetter
{
TiDB::DBInfoPtr getDatabase(DatabaseID db_id) { return MockTiDB::instance().getDBInfoByID(db_id); }

Int64 getVersion() { return MockTiDB::instance().getVersion(); }

SchemaDiff getSchemaDiff(Int64 version) { return MockTiDB::instance().getSchemaDiff(version); }
std::optional<SchemaDiff> getSchemaDiff(Int64 version)
{
return MockTiDB::instance().getSchemaDiff(version);
}

bool checkSchemaDiffExists(Int64 version)
{
return MockTiDB::instance().checkSchemaDiffExists(version);
}

TiDB::TableInfoPtr getTableInfo(DatabaseID, TableID table_id) { return MockTiDB::instance().getTableInfoByID(table_id); }

Expand Down
11 changes: 8 additions & 3 deletions dbms/src/Debug/MockTiDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ int MockTiDB::newTables(
tables_by_id.emplace(table->table_info.id, table);
tables_by_name.emplace(qualified_name, table);

AffectedOption opt;
AffectedOption opt{};
opt.schema_id = table->database_id;
opt.table_id = table->id();
opt.old_schema_id = table->database_id;
Expand Down Expand Up @@ -571,7 +571,7 @@ void MockTiDB::renameTables(const std::vector<std::tuple<std::string, std::strin
tables_by_name.erase(qualified_name);
tables_by_name.emplace(new_qualified_name, new_table);

AffectedOption opt;
AffectedOption opt{};
opt.schema_id = table->database_id;
opt.table_id = new_table->id();
opt.old_schema_id = table->database_id;
Expand Down Expand Up @@ -669,9 +669,14 @@ std::pair<bool, DatabaseID> MockTiDB::getDBIDByName(const String & database_name
return std::make_pair(false, -1);
}

SchemaDiff MockTiDB::getSchemaDiff(Int64 version_)
std::optional<SchemaDiff> MockTiDB::getSchemaDiff(Int64 version_)
{
return version_diff[version_];
}

bool MockTiDB::checkSchemaDiffExists(Int64 version)
{
return version_diff.find(version) != version_diff.end();
}

} // namespace DB
4 changes: 3 additions & 1 deletion dbms/src/Debug/MockTiDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ class MockTiDB : public ext::Singleton<MockTiDB>

std::pair<bool, DatabaseID> getDBIDByName(const String & database_name);

SchemaDiff getSchemaDiff(Int64 version);
bool checkSchemaDiffExists(Int64 version);

std::optional<SchemaDiff> getSchemaDiff(Int64 version);

std::unordered_map<String, DatabaseID> getDatabases() { return databases; }

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Transaction/ReadIndexWorker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,7 @@ BatchReadIndexRes ReadIndexWorkerManager::batchReadIndex(
}
}
{ // if meet timeout, which means part of regions can not get response from leader, try to poll rest tasks
TEST_LOG_FMT("rest {}, poll rest tasks onece", tasks.size());
TEST_LOG_FMT("rest {}, poll rest tasks once", tasks.size());

while (!tasks.empty())
{
Expand Down
13 changes: 10 additions & 3 deletions dbms/src/TiDB/Schema/SchemaGetter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

namespace DB
{

namespace ErrorCodes
{
extern const int SCHEMA_SYNC_ERROR;
Expand Down Expand Up @@ -188,18 +187,26 @@ Int64 SchemaGetter::getVersion()
return std::stoll(ver);
}

bool SchemaGetter::checkSchemaDiffExists(Int64 ver)
{
String key = getSchemaDiffKey(ver);
String data = TxnStructure::get(snap, key);
return !data.empty();
}

String SchemaGetter::getSchemaDiffKey(Int64 ver)
{
return std::string(schemaDiffPrefix) + ":" + std::to_string(ver);
}

SchemaDiff SchemaGetter::getSchemaDiff(Int64 ver)
std::optional<SchemaDiff> SchemaGetter::getSchemaDiff(Int64 ver)
{
String key = getSchemaDiffKey(ver);
String data = TxnStructure::get(snap, key);
if (data.empty())
{
throw TiFlashException("cannot find schema diff for version: " + std::to_string(ver), Errors::Table::SyncError);
LOG_FMT_WARNING(log, "The schema diff for version {}, key {} is empty.", ver, key);
return std::nullopt;
}
SchemaDiff diff;
diff.deserialize(data);
Expand Down
6 changes: 5 additions & 1 deletion dbms/src/TiDB/Schema/SchemaGetter.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

#include <common/logger_useful.h>

#include <optional>

namespace DB
{
// The enum results are completely the same as the DDL Action listed in the "parser/model/ddl.go" of TiDB codebase, which must be keeping in sync.
Expand Down Expand Up @@ -138,7 +140,9 @@ struct SchemaGetter

Int64 getVersion();

SchemaDiff getSchemaDiff(Int64 ver);
bool checkSchemaDiffExists(Int64 ver);

std::optional<SchemaDiff> getSchemaDiff(Int64 ver);

static String getSchemaDiffKey(Int64 ver);

Expand Down
70 changes: 54 additions & 16 deletions dbms/src/TiDB/Schema/TiDBSchemaSyncer.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,24 @@ struct TiDBSchemaSyncer : public SchemaSyncer
SCOPE_EXIT({ GET_METRIC(tiflash_schema_applying).Set(0.0); });

GET_METRIC(tiflash_schema_apply_count, type_diff).Increment();
if (!tryLoadSchemaDiffs(getter, version, context))
// After the feature concurrent DDL, TiDB does `update schema version` before `set schema diff`, and they are done in separate transactions.
// So TiFlash may see a schema version X but no schema diff X, meaning that the transaction of schema diff X has not been committed or has
// been aborted.
// However, TiDB makes sure that if we get a schema version X, then the schema diff X-1 must exist. Otherwise the transaction of schema diff
// X-1 is aborted and we can safely ignore it.
// Since TiDB can not make sure the schema diff of the latest schema version X is not empty, under this situation we should set the `cur_version`
// to X-1 and try to fetch the schema diff X next time.
Int64 version_after_load_diff = 0;
jiaqizho marked this conversation as resolved.
Show resolved Hide resolved
if (version_after_load_diff = tryLoadSchemaDiffs(getter, version, context); version_after_load_diff == -1)
{
GET_METRIC(tiflash_schema_apply_count, type_full).Increment();
loadAllSchema(getter, version, context);
// After loadAllSchema, we need update `version_after_load_diff` by last diff value exist or not
version_after_load_diff = getter.checkSchemaDiffExists(version) ? version : version - 1;
JaySon-Huang marked this conversation as resolved.
Show resolved Hide resolved
}
cur_version = version;
cur_version = version_after_load_diff;
GET_METRIC(tiflash_schema_version).Set(cur_version);
LOG_FMT_INFO(log, "end sync schema, version has been updated to {}", cur_version);
LOG_FMT_INFO(log, "End sync schema, version has been updated to {}{}", cur_version, cur_version == version ? "" : "(latest diff is empty)");
jiaqizho marked this conversation as resolved.
Show resolved Hide resolved
return true;
}

Expand All @@ -144,30 +154,57 @@ struct TiDBSchemaSyncer : public SchemaSyncer
return it->second;
}

bool tryLoadSchemaDiffs(Getter & getter, Int64 version, Context & context)
// Return Values
// - if latest schema diff is not empty, return the (latest_version)
// - if latest schema diff is empty, return the (latest_version - 1)
// - if error happend, return (-1)
Int64 tryLoadSchemaDiffs(Getter & getter, Int64 latest_version, Context & context)
{
if (isTooOldSchema(cur_version, version))
if (isTooOldSchema(cur_version, latest_version))
{
return false;
return -1;
}

LOG_FMT_DEBUG(log, "try load schema diffs.");

SchemaBuilder<Getter, NameMapper> builder(getter, context, databases, version);
SchemaBuilder<Getter, NameMapper> builder(getter, context, databases, latest_version);

Int64 used_version = cur_version;
std::vector<SchemaDiff> diffs;
while (used_version < version)
std::vector<std::optional<SchemaDiff>> diffs;
jiaqizho marked this conversation as resolved.
Show resolved Hide resolved
while (used_version < latest_version)
{
used_version++;
diffs.push_back(getter.getSchemaDiff(used_version));
}
LOG_FMT_DEBUG(log, "end load schema diffs with total {} entries.", diffs.size());

try
{
for (const auto & diff : diffs)
for (size_t diff_index = 0; diff_index < diffs.size(); diff_index++)
jiaqizho marked this conversation as resolved.
Show resolved Hide resolved
{
builder.applyDiff(diff);
const auto & schema_diff = diffs[diff_index];

if (!schema_diff)
{
// If `schema diff` from `latest_version` got empty `schema diff`
// Then we won't apply to `latest_version`, but we will apply to `latest_version - 1`
// If `schema diff` from [`cur_version`, `latest_version - 1`] got empty `schema diff`
// Then we should just skip it.
//
// example:
// - `cur_version` is 1, `latest_version` is 10
// - The schema diff of schema version [2,4,6] is empty, Then we just skip it.
// - The schema diff of schema version 10 is empty, Then we should just apply version into 9
if (diff_index != diffs.size() - 1)
{
LOG_FMT_WARNING(log, "Skip the schema diff from version {}. ", cur_version + diff_index + 1);
continue;
} // else (diff_index == diffs.size() - 1)

jiaqizho marked this conversation as resolved.
Show resolved Hide resolved
return used_version - 1;
}

builder.applyDiff(*schema_diff);
}
}
catch (TiFlashException & e)
Expand All @@ -177,7 +214,7 @@ struct TiDBSchemaSyncer : public SchemaSyncer
GET_METRIC(tiflash_schema_apply_count, type_failed).Increment();
}
LOG_FMT_WARNING(log, "apply diff meets exception : {} \n stack is {}", e.displayText(), e.getStackTrace().toString());
return false;
return -1;
}
catch (Exception & e)
{
Expand All @@ -187,21 +224,22 @@ struct TiDBSchemaSyncer : public SchemaSyncer
}
GET_METRIC(tiflash_schema_apply_count, type_failed).Increment();
LOG_FMT_WARNING(log, "apply diff meets exception : {} \n stack is {}", e.displayText(), e.getStackTrace().toString());
return false;
return -1;
}
catch (Poco::Exception & e)
{
GET_METRIC(tiflash_schema_apply_count, type_failed).Increment();
LOG_FMT_WARNING(log, "apply diff meets exception : {}", e.displayText());
return false;
return -1;
}
catch (std::exception & e)
{
GET_METRIC(tiflash_schema_apply_count, type_failed).Increment();
LOG_FMT_WARNING(log, "apply diff meets exception : {}", e.what());
return false;
return -1;
}
return true;

return used_version;
}

void loadAllSchema(Getter & getter, Int64 version, Context & context)
Expand Down