Skip to content

Commit

Permalink
FLASH-828: Use schema version in cop request (#388)
Browse files Browse the repository at this point in the history
* Check schema version in cop

* Refine get and lock table

* Fix format
  • Loading branch information
zanmato1984 authored Jan 10, 2020
1 parent 80ef503 commit 12faa8f
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 38 deletions.
1 change: 1 addition & 0 deletions .clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ Language: Cpp
AlignAfterOpenBracket: false
BreakBeforeBraces: Custom
BraceWrapping: {
AfterCaseLabel: 'true'
AfterClass: 'true'
AfterControlStatement: 'true'
AfterEnum : 'true'
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Debug/dbgFuncCoprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -666,8 +666,8 @@ tipb::SelectResponse executeDAGRequest(Context & context, const tipb::DAGRequest
static Logger * log = &Logger::get("MockDAG");
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handling DAG request: " << dag_request.DebugString());
tipb::SelectResponse dag_response;
DAGDriver driver(
context, dag_request, region_id, region_version, region_conf_version, start_ts, std::move(key_ranges), dag_response, true);
DAGDriver driver(context, dag_request, region_id, region_version, region_conf_version, start_ts, DEFAULT_UNSPECIFIED_SCHEMA_VERSION,
std::move(key_ranges), dag_response, true);
driver.execute();
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handle DAG request done");
return dag_response;
Expand Down
8 changes: 5 additions & 3 deletions dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
#include <Flash/Coprocessor/DAGDriver.h>

#include <Core/QueryProcessingStage.h>
#include <DataStreams/BlockIO.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <Flash/Coprocessor/DAGBlockOutputStream.h>
#include <Flash/Coprocessor/DAGDriver.h>
#include <Flash/Coprocessor/DAGQuerySource.h>
#include <Flash/Coprocessor/DAGStringConverter.h>
#include <Flash/Coprocessor/DAGUtils.h>
Expand All @@ -23,7 +22,7 @@ extern const int UNKNOWN_EXCEPTION;
} // namespace ErrorCodes

DAGDriver::DAGDriver(Context & context_, const tipb::DAGRequest & dag_request_, RegionID region_id_, UInt64 region_version_,
UInt64 region_conf_version_, UInt64 start_ts, std::vector<std::pair<DecodedTiKVKey, DecodedTiKVKey>> && key_ranges_,
UInt64 region_conf_version_, UInt64 start_ts, UInt64 schema_ver, std::vector<std::pair<DecodedTiKVKey, DecodedTiKVKey>> && key_ranges_,
tipb::SelectResponse & dag_response_, bool internal_)
: context(context_),
dag_request(dag_request_),
Expand All @@ -36,6 +35,9 @@ DAGDriver::DAGDriver(Context & context_, const tipb::DAGRequest & dag_request_,
log(&Logger::get("DAGDriver"))
{
context.setSetting("read_tso", start_ts);
if (schema_ver)
// schema_ver being 0 means TiDB/TiSpark hasn't specified schema version.
context.setSetting("schema_version", schema_ver);
}

void DAGDriver::execute()
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGDriver.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ class DAGDriver
{
public:
DAGDriver(Context & context_, const tipb::DAGRequest & dag_request_, RegionID region_id_, UInt64 region_version_,
UInt64 region_conf_version_, UInt64 start_ts, std::vector<std::pair<DecodedTiKVKey, DecodedTiKVKey>> && key_ranges_,
tipb::SelectResponse & dag_response_, bool internal_ = false);
UInt64 region_conf_version_, UInt64 start_ts, UInt64 schema_ver,
std::vector<std::pair<DecodedTiKVKey, DecodedTiKVKey>> && key_ranges_, tipb::SelectResponse & dag_response_,
bool internal_ = false);

void execute();

Expand Down
43 changes: 26 additions & 17 deletions dbms/src/Flash/Coprocessor/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#include <Flash/Coprocessor/InterpreterDAG.h>

#include <Core/TMTPKType.h>
#include <DataStreams/AggregatingBlockInputStream.h>
#include <DataStreams/BlockIO.h>
Expand All @@ -16,6 +14,7 @@
#include <Flash/Coprocessor/DAGQueryInfo.h>
#include <Flash/Coprocessor/DAGStringConverter.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <Flash/Coprocessor/InterpreterDAG.h>
#include <Interpreters/Aggregator.h>
#include <Parsers/ASTSelectQuery.h>
#include <Storages/MutableSupport.h>
Expand Down Expand Up @@ -46,7 +45,8 @@ extern const int COP_BAD_DAG_REQUEST;
InterpreterDAG::InterpreterDAG(Context & context_, const DAGQuerySource & dag_)
: context(context_),
dag(dag_),
keep_session_timezone_info(dag.getEncodeType() == tipb::EncodeType::TypeChunk || dag.getEncodeType() == tipb::EncodeType::TypeCHBlock),
keep_session_timezone_info(
dag.getEncodeType() == tipb::EncodeType::TypeChunk || dag.getEncodeType() == tipb::EncodeType::TypeCHBlock),
log(&Logger::get("InterpreterDAG"))
{}

Expand Down Expand Up @@ -161,8 +161,10 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
throw Exception("Table id not specified in table scan executor", ErrorCodes::COP_BAD_DAG_REQUEST);
}
TableID table_id = ts.table_id();
// TODO: Get schema version from DAG request.
if (context.getSettingsRef().schema_version == DEFAULT_UNSPECIFIED_SCHEMA_VERSION)

const Settings & settings = context.getSettingsRef();

if (settings.schema_version == DEFAULT_UNSPECIFIED_SCHEMA_VERSION)
{
storage = context.getTMTContext().getStorages().get(table_id);
if (storage == nullptr)
Expand All @@ -173,7 +175,7 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
}
else
{
getAndLockStorageWithSchemaVersion(table_id, DEFAULT_UNSPECIFIED_SCHEMA_VERSION);
getAndLockStorageWithSchemaVersion(table_id, settings.schema_version);
}

Names required_columns;
Expand Down Expand Up @@ -228,8 +230,6 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
}
}
// todo handle alias column
const Settings & settings = context.getSettingsRef();

if (settings.max_columns_to_read && required_columns.size() > settings.max_columns_to_read)
{
throw Exception("Limit for number of columns to read exceeded. "
Expand Down Expand Up @@ -486,21 +486,30 @@ void InterpreterDAG::getAndLockStorageWithSchemaVersion(TableID table_id, Int64
/// Lock storage.
auto lock = storage_->lockStructure(false, __PRETTY_FUNCTION__);

/// Check schema version.
/// Check schema version, requiring TiDB/TiSpark and TiFlash both use exactly the same schema.
// We have three schema versions, two in TiFlash:
// 1. Storage: the version that this TiFlash table (storage) was last altered.
// 2. Global: the version that TiFlash global schema is at.
// And one from TiDB/TiSpark:
// 3. Query: the version that TiDB/TiSpark used for this query.
auto storage_schema_version = storage_->getTableInfo().schema_version;
// Not allow storage schema version greater than query schema version in any case.
// Not allow storage > query in any case, one example is time travel queries.
if (storage_schema_version > query_schema_version)
throw Exception("Table " + std::to_string(table_id) + " schema version " + std::to_string(storage_schema_version)
+ " newer than query schema version " + std::to_string(query_schema_version),
ErrorCodes::SCHEMA_VERSION_ERROR);

// If schema synced, we must be very recent so we are good as long as storage schema version is no greater than query schema version.
// If schema not synced, we are good if storage schema version is right on query schema version.
// Otherwise we are at the risk of out-of-date schema, but we still have a chance to be sure that we are good, if global schema version is greater than query schema version.
if ((schema_synced && storage_schema_version <= query_schema_version)
|| (!schema_synced && (storage_schema_version == query_schema_version || global_schema_version > query_schema_version)))
// From now on we have storage <= query.
// If schema was synced, it implies that global >= query, as mentioned above we have storage <= query, we are OK to serve.
if (schema_synced)
return std::make_tuple(storage_, lock, storage_schema_version, true);

// From now on the schema was not synced.
// 1. storage == query, TiDB/TiSpark is using exactly the same schema that altered this table, we are just OK to serve.
// 2. global >= query, TiDB/TiSpark is using a schema older than TiFlash global, but as mentioned above we have storage <= query,
// meaning that the query schema is still newer than the time when this table was last altered, so we still OK to serve.
if (storage_schema_version == query_schema_version || global_schema_version >= query_schema_version)
return std::make_tuple(storage_, lock, storage_schema_version, true);
// From now on we have global < query.
// Return false for outer to sync and retry.
return std::make_tuple(nullptr, nullptr, storage_schema_version, false);
};

Expand Down
7 changes: 3 additions & 4 deletions dbms/src/Flash/CoprocessorHandler.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#include <Flash/CoprocessorHandler.h>

#include <Flash/Coprocessor/DAGDriver.h>
#include <Flash/Coprocessor/InterpreterDAG.h>
#include <Flash/CoprocessorHandler.h>
#include <Storages/IStorage.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/Transaction/LockException.h>
Expand Down Expand Up @@ -46,8 +45,8 @@ try
tipb::SelectResponse dag_response;
DAGDriver driver(cop_context.db_context, dag_request, cop_context.kv_context.region_id(),
cop_context.kv_context.region_epoch().version(), cop_context.kv_context.region_epoch().conf_ver(),
cop_request->start_ts() > 0 ? cop_request->start_ts() : dag_request.start_ts_fallback(), std::move(key_ranges),
dag_response);
cop_request->start_ts() > 0 ? cop_request->start_ts() : dag_request.start_ts_fallback(), cop_request->schema_ver(),
std::move(key_ranges), dag_response);
driver.execute();
cop_response->set_data(dag_response.SerializeAsString());
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handle DAG request done");
Expand Down
27 changes: 18 additions & 9 deletions dbms/src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,19 +215,28 @@ void InterpreterSelectQuery::getAndLockStorageWithSchemaVersion(const String & d
/// Lock storage.
auto lock = storage_->lockStructure(false, __PRETTY_FUNCTION__);

/// Check schema version.
/// Check schema version, requiring TiDB/TiSpark and TiFlash both use exactly the same schema.
// We have three schema versions, two in TiFlash:
// 1. Storage: the version that this TiFlash table (storage) was last altered.
// 2. Global: the version that TiFlash global schema is at.
// And one from TiDB/TiSpark:
// 3. Query: the version that TiDB/TiSpark used for this query.
auto storage_schema_version = merge_tree->getTableInfo().schema_version;
// Not allow storage schema version greater than query schema version in any case.
// Not allow storage > query in any case, one example is time travel queries.
if (storage_schema_version > query_schema_version)
throw Exception("Table " + qualified_name + " schema version " + std::to_string(storage_schema_version) + " newer than query schema version " + std::to_string(query_schema_version), ErrorCodes::SCHEMA_VERSION_ERROR);

// If schema synced, we must be very recent so we are good as long as storage schema version is no greater than query schema version.
// If schema not synced, we are good if storage schema version is right on query schema version.
// Otherwise we are at the risk of out-of-date schema, but we still have a chance to be sure that we are good, if global schema version is greater than query schema version.
if ((schema_synced && storage_schema_version <= query_schema_version)
|| (!schema_synced && (storage_schema_version == query_schema_version || global_schema_version > query_schema_version)))
// From now on we have storage <= query.
// If schema was synced, it implies that global >= query, as mentioned above we have storage <= query, we are OK to serve.
if (schema_synced)
return std::make_tuple(storage_, lock, storage_schema_version, true);

// From now on the schema was not synced.
// 1. storage == query, TiDB/TiSpark is using exactly the same schema that altered this table, we are just OK to serve.
// 2. global >= query, TiDB/TiSpark is using a schema older than TiFlash global, but as mentioned above we have storage <= query,
// meaning that the query schema is still newer than the time when this table was last altered, so we still OK to serve.
if (storage_schema_version == query_schema_version || global_schema_version >= query_schema_version)
return std::make_tuple(storage_, lock, storage_schema_version, true);
// From now on we have global < query.
// Return false for outer to sync and retry.
return std::make_tuple(nullptr, nullptr, storage_schema_version, false);
};

Expand Down

0 comments on commit 12faa8f

Please sign in to comment.