Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FLASH-828: Use schema version in cop request #388

Merged
merged 4 commits into from
Jan 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ Language: Cpp
AlignAfterOpenBracket: false
BreakBeforeBraces: Custom
BraceWrapping: {
AfterCaseLabel: 'true'
AfterClass: 'true'
AfterControlStatement: 'true'
AfterEnum : 'true'
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Debug/dbgFuncCoprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -666,8 +666,8 @@ tipb::SelectResponse executeDAGRequest(Context & context, const tipb::DAGRequest
static Logger * log = &Logger::get("MockDAG");
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handling DAG request: " << dag_request.DebugString());
tipb::SelectResponse dag_response;
DAGDriver driver(
context, dag_request, region_id, region_version, region_conf_version, start_ts, std::move(key_ranges), dag_response, true);
DAGDriver driver(context, dag_request, region_id, region_version, region_conf_version, start_ts, DEFAULT_UNSPECIFIED_SCHEMA_VERSION,
std::move(key_ranges), dag_response, true);
driver.execute();
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handle DAG request done");
return dag_response;
Expand Down
8 changes: 5 additions & 3 deletions dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
#include <Flash/Coprocessor/DAGDriver.h>

#include <Core/QueryProcessingStage.h>
#include <DataStreams/BlockIO.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <Flash/Coprocessor/DAGBlockOutputStream.h>
#include <Flash/Coprocessor/DAGDriver.h>
#include <Flash/Coprocessor/DAGQuerySource.h>
#include <Flash/Coprocessor/DAGStringConverter.h>
#include <Flash/Coprocessor/DAGUtils.h>
Expand All @@ -23,7 +22,7 @@ extern const int UNKNOWN_EXCEPTION;
} // namespace ErrorCodes

DAGDriver::DAGDriver(Context & context_, const tipb::DAGRequest & dag_request_, RegionID region_id_, UInt64 region_version_,
UInt64 region_conf_version_, UInt64 start_ts, std::vector<std::pair<DecodedTiKVKey, DecodedTiKVKey>> && key_ranges_,
UInt64 region_conf_version_, UInt64 start_ts, UInt64 schema_ver, std::vector<std::pair<DecodedTiKVKey, DecodedTiKVKey>> && key_ranges_,
tipb::SelectResponse & dag_response_, bool internal_)
: context(context_),
dag_request(dag_request_),
Expand All @@ -36,6 +35,9 @@ DAGDriver::DAGDriver(Context & context_, const tipb::DAGRequest & dag_request_,
log(&Logger::get("DAGDriver"))
{
context.setSetting("read_tso", start_ts);
if (schema_ver)
// schema_ver being 0 means TiDB/TiSpark hasn't specified schema version.
context.setSetting("schema_version", schema_ver);
}

void DAGDriver::execute()
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGDriver.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ class DAGDriver
{
public:
DAGDriver(Context & context_, const tipb::DAGRequest & dag_request_, RegionID region_id_, UInt64 region_version_,
UInt64 region_conf_version_, UInt64 start_ts, std::vector<std::pair<DecodedTiKVKey, DecodedTiKVKey>> && key_ranges_,
tipb::SelectResponse & dag_response_, bool internal_ = false);
UInt64 region_conf_version_, UInt64 start_ts, UInt64 schema_ver,
std::vector<std::pair<DecodedTiKVKey, DecodedTiKVKey>> && key_ranges_, tipb::SelectResponse & dag_response_,
bool internal_ = false);

void execute();

Expand Down
43 changes: 26 additions & 17 deletions dbms/src/Flash/Coprocessor/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#include <Flash/Coprocessor/InterpreterDAG.h>

#include <Core/TMTPKType.h>
#include <DataStreams/AggregatingBlockInputStream.h>
#include <DataStreams/BlockIO.h>
Expand All @@ -16,6 +14,7 @@
#include <Flash/Coprocessor/DAGQueryInfo.h>
#include <Flash/Coprocessor/DAGStringConverter.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <Flash/Coprocessor/InterpreterDAG.h>
#include <Interpreters/Aggregator.h>
#include <Parsers/ASTSelectQuery.h>
#include <Storages/MutableSupport.h>
Expand Down Expand Up @@ -46,7 +45,8 @@ extern const int COP_BAD_DAG_REQUEST;
InterpreterDAG::InterpreterDAG(Context & context_, const DAGQuerySource & dag_)
: context(context_),
dag(dag_),
keep_session_timezone_info(dag.getEncodeType() == tipb::EncodeType::TypeChunk || dag.getEncodeType() == tipb::EncodeType::TypeCHBlock),
keep_session_timezone_info(
dag.getEncodeType() == tipb::EncodeType::TypeChunk || dag.getEncodeType() == tipb::EncodeType::TypeCHBlock),
log(&Logger::get("InterpreterDAG"))
{}

Expand Down Expand Up @@ -161,8 +161,10 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
throw Exception("Table id not specified in table scan executor", ErrorCodes::COP_BAD_DAG_REQUEST);
}
TableID table_id = ts.table_id();
// TODO: Get schema version from DAG request.
if (context.getSettingsRef().schema_version == DEFAULT_UNSPECIFIED_SCHEMA_VERSION)

const Settings & settings = context.getSettingsRef();

if (settings.schema_version == DEFAULT_UNSPECIFIED_SCHEMA_VERSION)
{
storage = context.getTMTContext().getStorages().get(table_id);
if (storage == nullptr)
Expand All @@ -173,7 +175,7 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
}
else
{
getAndLockStorageWithSchemaVersion(table_id, DEFAULT_UNSPECIFIED_SCHEMA_VERSION);
getAndLockStorageWithSchemaVersion(table_id, settings.schema_version);
}

Names required_columns;
Expand Down Expand Up @@ -228,8 +230,6 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
}
}
// todo handle alias column
const Settings & settings = context.getSettingsRef();

if (settings.max_columns_to_read && required_columns.size() > settings.max_columns_to_read)
{
throw Exception("Limit for number of columns to read exceeded. "
Expand Down Expand Up @@ -486,21 +486,30 @@ void InterpreterDAG::getAndLockStorageWithSchemaVersion(TableID table_id, Int64
/// Lock storage.
auto lock = storage_->lockStructure(false, __PRETTY_FUNCTION__);

/// Check schema version.
/// Check schema version, requiring TiDB/TiSpark and TiFlash both use exactly the same schema.
// We have three schema versions, two in TiFlash:
// 1. Storage: the version that this TiFlash table (storage) was last altered.
// 2. Global: the version that TiFlash global schema is at.
// And one from TiDB/TiSpark:
// 3. Query: the version that TiDB/TiSpark used for this query.
auto storage_schema_version = storage_->getTableInfo().schema_version;
// Not allow storage schema version greater than query schema version in any case.
// Not allow storage > query in any case, one example is time travel queries.
if (storage_schema_version > query_schema_version)
throw Exception("Table " + std::to_string(table_id) + " schema version " + std::to_string(storage_schema_version)
+ " newer than query schema version " + std::to_string(query_schema_version),
ErrorCodes::SCHEMA_VERSION_ERROR);

// If schema synced, we must be very recent so we are good as long as storage schema version is no greater than query schema version.
// If schema not synced, we are good if storage schema version is right on query schema version.
// Otherwise we are at the risk of out-of-date schema, but we still have a chance to be sure that we are good, if global schema version is greater than query schema version.
if ((schema_synced && storage_schema_version <= query_schema_version)
|| (!schema_synced && (storage_schema_version == query_schema_version || global_schema_version > query_schema_version)))
// From now on we have storage <= query.
// If schema was synced, it implies that global >= query, as mentioned above we have storage <= query, we are OK to serve.
if (schema_synced)
return std::make_tuple(storage_, lock, storage_schema_version, true);

// From now on the schema was not synced.
// 1. storage == query, TiDB/TiSpark is using exactly the same schema that altered this table, we are just OK to serve.
// 2. global >= query, TiDB/TiSpark is using a schema older than TiFlash global, but as mentioned above we have storage <= query,
// meaning that the query schema is still newer than the time when this table was last altered, so we still OK to serve.
if (storage_schema_version == query_schema_version || global_schema_version >= query_schema_version)
return std::make_tuple(storage_, lock, storage_schema_version, true);
// From now on we have global < query.
// Return false for outer to sync and retry.
return std::make_tuple(nullptr, nullptr, storage_schema_version, false);
};

Expand Down
7 changes: 3 additions & 4 deletions dbms/src/Flash/CoprocessorHandler.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#include <Flash/CoprocessorHandler.h>

#include <Flash/Coprocessor/DAGDriver.h>
#include <Flash/Coprocessor/InterpreterDAG.h>
#include <Flash/CoprocessorHandler.h>
#include <Storages/IStorage.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/Transaction/LockException.h>
Expand Down Expand Up @@ -46,8 +45,8 @@ try
tipb::SelectResponse dag_response;
DAGDriver driver(cop_context.db_context, dag_request, cop_context.kv_context.region_id(),
cop_context.kv_context.region_epoch().version(), cop_context.kv_context.region_epoch().conf_ver(),
cop_request->start_ts() > 0 ? cop_request->start_ts() : dag_request.start_ts_fallback(), std::move(key_ranges),
dag_response);
cop_request->start_ts() > 0 ? cop_request->start_ts() : dag_request.start_ts_fallback(), cop_request->schema_ver(),
std::move(key_ranges), dag_response);
driver.execute();
cop_response->set_data(dag_response.SerializeAsString());
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handle DAG request done");
Expand Down
27 changes: 18 additions & 9 deletions dbms/src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,19 +215,28 @@ void InterpreterSelectQuery::getAndLockStorageWithSchemaVersion(const String & d
/// Lock storage.
auto lock = storage_->lockStructure(false, __PRETTY_FUNCTION__);

/// Check schema version.
/// Check schema version, requiring TiDB/TiSpark and TiFlash both use exactly the same schema.
// We have three schema versions, two in TiFlash:
// 1. Storage: the version that this TiFlash table (storage) was last altered.
// 2. Global: the version that TiFlash global schema is at.
// And one from TiDB/TiSpark:
// 3. Query: the version that TiDB/TiSpark used for this query.
auto storage_schema_version = merge_tree->getTableInfo().schema_version;
// Not allow storage schema version greater than query schema version in any case.
// Not allow storage > query in any case, one example is time travel queries.
if (storage_schema_version > query_schema_version)
throw Exception("Table " + qualified_name + " schema version " + std::to_string(storage_schema_version) + " newer than query schema version " + std::to_string(query_schema_version), ErrorCodes::SCHEMA_VERSION_ERROR);

// If schema synced, we must be very recent so we are good as long as storage schema version is no greater than query schema version.
// If schema not synced, we are good if storage schema version is right on query schema version.
// Otherwise we are at the risk of out-of-date schema, but we still have a chance to be sure that we are good, if global schema version is greater than query schema version.
if ((schema_synced && storage_schema_version <= query_schema_version)
|| (!schema_synced && (storage_schema_version == query_schema_version || global_schema_version > query_schema_version)))
// From now on we have storage <= query.
// If schema was synced, it implies that global >= query, as mentioned above we have storage <= query, we are OK to serve.
if (schema_synced)
return std::make_tuple(storage_, lock, storage_schema_version, true);

// From now on the schema was not synced.
// 1. storage == query, TiDB/TiSpark is using exactly the same schema that altered this table, we are just OK to serve.
// 2. global >= query, TiDB/TiSpark is using a schema older than TiFlash global, but as mentioned above we have storage <= query,
// meaning that the query schema is still newer than the time when this table was last altered, so we still OK to serve.
if (storage_schema_version == query_schema_version || global_schema_version >= query_schema_version)
return std::make_tuple(storage_, lock, storage_schema_version, true);
// From now on we have global < query.
// Return false for outer to sync and retry.
return std::make_tuple(nullptr, nullptr, storage_schema_version, false);
};

Expand Down