From 12faa8f7d73a25287bb5754326c7fcd8bca905a0 Mon Sep 17 00:00:00 2001 From: ruoxi Date: Fri, 10 Jan 2020 13:39:24 +0800 Subject: [PATCH] FLASH-828: Use schema version in cop request (#388) * Check schema version in cop * Refine get and lock table * Fix format --- .clang-format | 1 + contrib/kvproto | 2 +- dbms/src/Debug/dbgFuncCoprocessor.cpp | 4 +- dbms/src/Flash/Coprocessor/DAGDriver.cpp | 8 ++-- dbms/src/Flash/Coprocessor/DAGDriver.h | 5 ++- dbms/src/Flash/Coprocessor/InterpreterDAG.cpp | 43 +++++++++++-------- dbms/src/Flash/CoprocessorHandler.cpp | 7 ++- .../Interpreters/InterpreterSelectQuery.cpp | 27 ++++++++---- 8 files changed, 59 insertions(+), 38 deletions(-) diff --git a/.clang-format b/.clang-format index b192117c7c6..5e988172ed8 100644 --- a/.clang-format +++ b/.clang-format @@ -4,6 +4,7 @@ Language: Cpp AlignAfterOpenBracket: false BreakBeforeBraces: Custom BraceWrapping: { + AfterCaseLabel: 'true' AfterClass: 'true' AfterControlStatement: 'true' AfterEnum : 'true' diff --git a/contrib/kvproto b/contrib/kvproto index 393e6c0fd4b..80122c79a66 160000 --- a/contrib/kvproto +++ b/contrib/kvproto @@ -1 +1 @@ -Subproject commit 393e6c0fd4b76a6fe6f8196eb3011ee3daa1ac75 +Subproject commit 80122c79a6646d84788559c14ea2bfdfde5812ff diff --git a/dbms/src/Debug/dbgFuncCoprocessor.cpp b/dbms/src/Debug/dbgFuncCoprocessor.cpp index 03e870b1c74..b5923701cf9 100644 --- a/dbms/src/Debug/dbgFuncCoprocessor.cpp +++ b/dbms/src/Debug/dbgFuncCoprocessor.cpp @@ -666,8 +666,8 @@ tipb::SelectResponse executeDAGRequest(Context & context, const tipb::DAGRequest static Logger * log = &Logger::get("MockDAG"); LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handling DAG request: " << dag_request.DebugString()); tipb::SelectResponse dag_response; - DAGDriver driver( - context, dag_request, region_id, region_version, region_conf_version, start_ts, std::move(key_ranges), dag_response, true); + DAGDriver driver(context, dag_request, region_id, region_version, region_conf_version, start_ts, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, + std::move(key_ranges), dag_response, true); driver.execute(); LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handle DAG request done"); return dag_response; diff --git a/dbms/src/Flash/Coprocessor/DAGDriver.cpp b/dbms/src/Flash/Coprocessor/DAGDriver.cpp index c693915d2a8..ee5ea3ccb41 100644 --- a/dbms/src/Flash/Coprocessor/DAGDriver.cpp +++ b/dbms/src/Flash/Coprocessor/DAGDriver.cpp @@ -1,10 +1,9 @@ -#include - #include #include #include #include #include +#include #include #include #include @@ -23,7 +22,7 @@ extern const int UNKNOWN_EXCEPTION; } // namespace ErrorCodes DAGDriver::DAGDriver(Context & context_, const tipb::DAGRequest & dag_request_, RegionID region_id_, UInt64 region_version_, - UInt64 region_conf_version_, UInt64 start_ts, std::vector> && key_ranges_, + UInt64 region_conf_version_, UInt64 start_ts, UInt64 schema_ver, std::vector> && key_ranges_, tipb::SelectResponse & dag_response_, bool internal_) : context(context_), dag_request(dag_request_), @@ -36,6 +35,9 @@ DAGDriver::DAGDriver(Context & context_, const tipb::DAGRequest & dag_request_, log(&Logger::get("DAGDriver")) { context.setSetting("read_tso", start_ts); + if (schema_ver) + // schema_ver being 0 means TiDB/TiSpark hasn't specified schema version. + context.setSetting("schema_version", schema_ver); } void DAGDriver::execute() diff --git a/dbms/src/Flash/Coprocessor/DAGDriver.h b/dbms/src/Flash/Coprocessor/DAGDriver.h index 32d06e661b9..cd74b5d6ad8 100644 --- a/dbms/src/Flash/Coprocessor/DAGDriver.h +++ b/dbms/src/Flash/Coprocessor/DAGDriver.h @@ -16,8 +16,9 @@ class DAGDriver { public: DAGDriver(Context & context_, const tipb::DAGRequest & dag_request_, RegionID region_id_, UInt64 region_version_, - UInt64 region_conf_version_, UInt64 start_ts, std::vector> && key_ranges_, - tipb::SelectResponse & dag_response_, bool internal_ = false); + UInt64 region_conf_version_, UInt64 start_ts, UInt64 schema_ver, + std::vector> && key_ranges_, tipb::SelectResponse & dag_response_, + bool internal_ = false); void execute(); diff --git a/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp b/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp index 290fed448b5..f273003f860 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp +++ b/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp @@ -1,5 +1,3 @@ -#include - #include #include #include @@ -16,6 +14,7 @@ #include #include #include +#include #include #include #include @@ -46,7 +45,8 @@ extern const int COP_BAD_DAG_REQUEST; InterpreterDAG::InterpreterDAG(Context & context_, const DAGQuerySource & dag_) : context(context_), dag(dag_), - keep_session_timezone_info(dag.getEncodeType() == tipb::EncodeType::TypeChunk || dag.getEncodeType() == tipb::EncodeType::TypeCHBlock), + keep_session_timezone_info( + dag.getEncodeType() == tipb::EncodeType::TypeChunk || dag.getEncodeType() == tipb::EncodeType::TypeCHBlock), log(&Logger::get("InterpreterDAG")) {} @@ -161,8 +161,10 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline) throw Exception("Table id not specified in table scan executor", ErrorCodes::COP_BAD_DAG_REQUEST); } TableID table_id = ts.table_id(); - // TODO: Get schema version from DAG request. - if (context.getSettingsRef().schema_version == DEFAULT_UNSPECIFIED_SCHEMA_VERSION) + + const Settings & settings = context.getSettingsRef(); + + if (settings.schema_version == DEFAULT_UNSPECIFIED_SCHEMA_VERSION) { storage = context.getTMTContext().getStorages().get(table_id); if (storage == nullptr) @@ -173,7 +175,7 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline) } else { - getAndLockStorageWithSchemaVersion(table_id, DEFAULT_UNSPECIFIED_SCHEMA_VERSION); + getAndLockStorageWithSchemaVersion(table_id, settings.schema_version); } Names required_columns; @@ -228,8 +230,6 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline) } } // todo handle alias column - const Settings & settings = context.getSettingsRef(); - if (settings.max_columns_to_read && required_columns.size() > settings.max_columns_to_read) { throw Exception("Limit for number of columns to read exceeded. " @@ -486,21 +486,30 @@ void InterpreterDAG::getAndLockStorageWithSchemaVersion(TableID table_id, Int64 /// Lock storage. auto lock = storage_->lockStructure(false, __PRETTY_FUNCTION__); - /// Check schema version. + /// Check schema version, requiring TiDB/TiSpark and TiFlash both use exactly the same schema. + // We have three schema versions, two in TiFlash: + // 1. Storage: the version that this TiFlash table (storage) was last altered. + // 2. Global: the version that TiFlash global schema is at. + // And one from TiDB/TiSpark: + // 3. Query: the version that TiDB/TiSpark used for this query. auto storage_schema_version = storage_->getTableInfo().schema_version; - // Not allow storage schema version greater than query schema version in any case. + // Not allow storage > query in any case, one example is time travel queries. if (storage_schema_version > query_schema_version) throw Exception("Table " + std::to_string(table_id) + " schema version " + std::to_string(storage_schema_version) + " newer than query schema version " + std::to_string(query_schema_version), ErrorCodes::SCHEMA_VERSION_ERROR); - - // If schema synced, we must be very recent so we are good as long as storage schema version is no greater than query schema version. - // If schema not synced, we are good if storage schema version is right on query schema version. - // Otherwise we are at the risk of out-of-date schema, but we still have a chance to be sure that we are good, if global schema version is greater than query schema version. - if ((schema_synced && storage_schema_version <= query_schema_version) - || (!schema_synced && (storage_schema_version == query_schema_version || global_schema_version > query_schema_version))) + // From now on we have storage <= query. + // If schema was synced, it implies that global >= query, as mentioned above we have storage <= query, we are OK to serve. + if (schema_synced) return std::make_tuple(storage_, lock, storage_schema_version, true); - + // From now on the schema was not synced. + // 1. storage == query, TiDB/TiSpark is using exactly the same schema that altered this table, we are just OK to serve. + // 2. global >= query, TiDB/TiSpark is using a schema older than TiFlash global, but as mentioned above we have storage <= query, + // meaning that the query schema is still newer than the time when this table was last altered, so we still OK to serve. + if (storage_schema_version == query_schema_version || global_schema_version >= query_schema_version) + return std::make_tuple(storage_, lock, storage_schema_version, true); + // From now on we have global < query. + // Return false for outer to sync and retry. return std::make_tuple(nullptr, nullptr, storage_schema_version, false); }; diff --git a/dbms/src/Flash/CoprocessorHandler.cpp b/dbms/src/Flash/CoprocessorHandler.cpp index 2bc8707c4de..f0c2223c1b2 100644 --- a/dbms/src/Flash/CoprocessorHandler.cpp +++ b/dbms/src/Flash/CoprocessorHandler.cpp @@ -1,7 +1,6 @@ -#include - #include #include +#include #include #include #include @@ -46,8 +45,8 @@ try tipb::SelectResponse dag_response; DAGDriver driver(cop_context.db_context, dag_request, cop_context.kv_context.region_id(), cop_context.kv_context.region_epoch().version(), cop_context.kv_context.region_epoch().conf_ver(), - cop_request->start_ts() > 0 ? cop_request->start_ts() : dag_request.start_ts_fallback(), std::move(key_ranges), - dag_response); + cop_request->start_ts() > 0 ? cop_request->start_ts() : dag_request.start_ts_fallback(), cop_request->schema_ver(), + std::move(key_ranges), dag_response); driver.execute(); cop_response->set_data(dag_response.SerializeAsString()); LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handle DAG request done"); diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 5c8949b40ed..b3ec536c5e7 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -215,19 +215,28 @@ void InterpreterSelectQuery::getAndLockStorageWithSchemaVersion(const String & d /// Lock storage. auto lock = storage_->lockStructure(false, __PRETTY_FUNCTION__); - /// Check schema version. + /// Check schema version, requiring TiDB/TiSpark and TiFlash both use exactly the same schema. + // We have three schema versions, two in TiFlash: + // 1. Storage: the version that this TiFlash table (storage) was last altered. + // 2. Global: the version that TiFlash global schema is at. + // And one from TiDB/TiSpark: + // 3. Query: the version that TiDB/TiSpark used for this query. auto storage_schema_version = merge_tree->getTableInfo().schema_version; - // Not allow storage schema version greater than query schema version in any case. + // Not allow storage > query in any case, one example is time travel queries. if (storage_schema_version > query_schema_version) throw Exception("Table " + qualified_name + " schema version " + std::to_string(storage_schema_version) + " newer than query schema version " + std::to_string(query_schema_version), ErrorCodes::SCHEMA_VERSION_ERROR); - - // If schema synced, we must be very recent so we are good as long as storage schema version is no greater than query schema version. - // If schema not synced, we are good if storage schema version is right on query schema version. - // Otherwise we are at the risk of out-of-date schema, but we still have a chance to be sure that we are good, if global schema version is greater than query schema version. - if ((schema_synced && storage_schema_version <= query_schema_version) - || (!schema_synced && (storage_schema_version == query_schema_version || global_schema_version > query_schema_version))) + // From now on we have storage <= query. + // If schema was synced, it implies that global >= query, as mentioned above we have storage <= query, we are OK to serve. + if (schema_synced) return std::make_tuple(storage_, lock, storage_schema_version, true); - + // From now on the schema was not synced. + // 1. storage == query, TiDB/TiSpark is using exactly the same schema that altered this table, we are just OK to serve. + // 2. global >= query, TiDB/TiSpark is using a schema older than TiFlash global, but as mentioned above we have storage <= query, + // meaning that the query schema is still newer than the time when this table was last altered, so we still OK to serve. + if (storage_schema_version == query_schema_version || global_schema_version >= query_schema_version) + return std::make_tuple(storage_, lock, storage_schema_version, true); + // From now on we have global < query. + // Return false for outer to sync and retry. return std::make_tuple(nullptr, nullptr, storage_schema_version, false); };