Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix "Output offset index is out of bound" error for some update statements #338

Merged
merged 7 commits into from
Dec 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 27 additions & 33 deletions dbms/src/Debug/dbgFuncCoprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ParserSelectQuery.h>
#include <Parsers/parseQuery.h>
#include <Storages/MutableSupport.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/Transaction/Datum.h>
#include <Storages/Transaction/KVStore.h>
Expand Down Expand Up @@ -145,8 +146,6 @@ BlockInputStreamPtr dbgFuncMockDAG(Context & context, const ASTs & args)
return outputDAGResponse(context, schema, dag_response);
}

const String VOID_COL_NAME = "_void";

struct ExecutorCtx
{
tipb::Executor * input;
Expand Down Expand Up @@ -388,6 +387,21 @@ std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(Context & context,
hijackTiDBTypeForMockTest(ci);
ts_output.emplace_back(std::make_pair(column_info.name, std::move(ci)));
}
for (const auto & expr : ast_query.select_expression_list->children)
{
if (ASTIdentifier * identifier = typeid_cast<ASTIdentifier *>(expr.get()))
{
if (identifier->getColumnName() == MutableSupport::tidb_pk_column_name)
{
ColumnInfo ci;
ci.tp = TiDB::TypeLongLong;
ci.setPriKeyFlag();
ci.setNotNullFlag();
hijackTiDBTypeForMockTest(ci);
ts_output.emplace_back(std::make_pair(MutableSupport::tidb_pk_column_name, std::move(ci)));
}
}
}
executor_ctx_map.emplace(
ts_exec, ExecutorCtx{nullptr, std::move(ts_output), std::unordered_map<String, std::vector<tipb::Expr *>>{}});
last_executor = ts_exec;
Expand Down Expand Up @@ -450,7 +464,10 @@ std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(Context & context,
for (const auto & info : executor_ctx.output)
{
tipb::ColumnInfo * ci = ts->add_columns();
ci->set_column_id(table_info.getColumnID(info.first));
if (info.first == MutableSupport::tidb_pk_column_name)
ci->set_column_id(-1);
else
ci->set_column_id(table_info.getColumnID(info.first));
ci->set_tp(info.second.tp);
ci->set_flag(info.second.flag);
ci->set_columnlen(info.second.flen);
Expand Down Expand Up @@ -590,37 +607,14 @@ std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(Context & context,

const auto & last_output = executor_ctx_map[last_executor].output;

// For testing VOID column, ignore any other select expressions, unless table contains it.
if (std::find(final_output.begin(), final_output.end(), VOID_COL_NAME) != final_output.end()
&& std::find_if(
last_output.begin(), last_output.end(), [&](const auto & last_field) { return last_field.first == VOID_COL_NAME; })
== last_output.end())
{
dag_request.add_output_offsets(0);

// Set column ID to -1 to trigger `void` column in DAG processing.
tipb::ColumnInfo * ci = ts->add_columns();
ci->set_column_id(-1);

// Set column name to VOID and tp to Nullable(UInt64),
// as chunk decoding doesn't do strict field type check so Nullable(UInt64) should be enough.
ColumnInfo ti_ci;
ti_ci.name = VOID_COL_NAME;
ti_ci.tp = TiDB::TypeLongLong;
ti_ci.setNotNullFlag();
schema.emplace_back(DAGColumnInfo{VOID_COL_NAME, std::move(ti_ci)});
}
else
for (const auto & field : final_output)
{
for (const auto & field : final_output)
{
auto iter = std::find_if(
last_output.begin(), last_output.end(), [&](const auto & last_field) { return last_field.first == field; });
if (iter == last_output.end())
throw Exception("Column not found after pruning: " + field, ErrorCodes::LOGICAL_ERROR);
dag_request.add_output_offsets(iter - last_output.begin());
schema.push_back(*iter);
}
auto iter
= std::find_if(last_output.begin(), last_output.end(), [&](const auto & last_field) { return last_field.first == field; });
if (iter == last_output.end())
throw Exception("Column not found after pruning: " + field, ErrorCodes::LOGICAL_ERROR);
dag_request.add_output_offsets(iter - last_output.begin());
schema.push_back(*iter);
}
}

Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ struct DAGContext
{
DAGContext(size_t profile_list_size) { profile_streams_list.resize(profile_list_size); };
std::vector<BlockInputStreams> profile_streams_list;

tipb::FieldType void_result_ft;
};

} // namespace DB
7 changes: 2 additions & 5 deletions dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,8 @@ try
// Only query is allowed, so streams.in must not be null and streams.out must be null
throw Exception("DAG is not query.", ErrorCodes::LOGICAL_ERROR);

BlockOutputStreamPtr dag_output_stream = std::make_shared<DAGBlockOutputStream>(dag_response,
context.getSettings().dag_records_per_chunk,
dag.getEncodeType(),
dag.getResultFieldTypes(dag.getDAGContext().void_result_ft),
streams.in->getHeader());
BlockOutputStreamPtr dag_output_stream = std::make_shared<DAGBlockOutputStream>(
dag_response, context.getSettings().dag_records_per_chunk, dag.getEncodeType(), dag.getResultFieldTypes(), streams.in->getHeader());
copyData(*streams.in, *dag_output_stream);

if (!dag_request.has_collect_execution_summaries() || !dag_request.collect_execution_summaries())
Expand Down
12 changes: 3 additions & 9 deletions dbms/src/Flash/Coprocessor/DAGQuerySource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ DAGQuerySource::DAGQuerySource(Context & context_, DAGContext & dag_context_, Re
break;
default:
throw Exception(
"Unsupported executor in DAG request: " + dag_request.executors(i).DebugString(), ErrorCodes::NOT_IMPLEMENTED);
"Unsupported executor in DAG request: " + dag_request.executors(i).DebugString(), ErrorCodes::NOT_IMPLEMENTED);
}
}
encode_type = dag_request.encode_type();
if (encode_type == tipb::EncodeType::TypeArrow && hasUnsupportedTypeForArrowEncode(getResultFieldTypes({})))
if (encode_type == tipb::EncodeType::TypeArrow && hasUnsupportedTypeForArrowEncode(getResultFieldTypes()))
{
encode_type = tipb::EncodeType::TypeDefault;
}
Expand Down Expand Up @@ -100,8 +100,6 @@ bool fillExecutorOutputFieldTypes(const tipb::Executor & executor, std::vector<t
case tipb::ExecType::TypeTableScan:
for (auto & ci : executor.tbl_scan().columns())
{
if (ci.column_id() == -1)
continue;
field_type.set_tp(ci.tp());
field_type.set_flag(ci.flag());
field_type.set_flen(ci.columnlen());
Expand Down Expand Up @@ -133,17 +131,13 @@ bool fillExecutorOutputFieldTypes(const tipb::Executor & executor, std::vector<t
}
}

std::vector<tipb::FieldType> DAGQuerySource::getResultFieldTypes(const tipb::FieldType & void_result_ft) const
std::vector<tipb::FieldType> DAGQuerySource::getResultFieldTypes() const
{
std::vector<tipb::FieldType> executor_output;
for (int i = dag_request.executors_size() - 1; i >= 0; i--)
{
if (fillExecutorOutputFieldTypes(dag_request.executors(i), executor_output))
{
if (executor_output.empty())
executor_output.push_back(void_result_ft);
break;
}
}
if (executor_output.empty())
{
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGQuerySource.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class DAGQuerySource : public IQuerySource
};
const tipb::DAGRequest & getDAGRequest() const { return dag_request; };

std::vector<tipb::FieldType> getResultFieldTypes(const tipb::FieldType & void_result_ft) const;
std::vector<tipb::FieldType> getResultFieldTypes() const;

ASTPtr getAST() const { return ast; };

Expand Down
61 changes: 32 additions & 29 deletions dbms/src/Flash/Coprocessor/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,18 @@ bool checkKeyRanges(const std::vector<std::pair<DecodedTiKVKey, DecodedTiKVKey>>
else
return isAllValueCoveredByRanges<Int64>(handle_ranges, region_handle_ranges);
}

RegionException::RegionReadStatus InterpreterDAG::getRegionReadStatus(RegionPtr current_region)
{
if (!current_region)
return RegionException::NOT_FOUND;
if (current_region->version() != dag.getRegionVersion() || current_region->confVer() != dag.getRegionConfVersion())
return RegionException::VERSION_ERROR;
if (current_region->isPendingRemove())
return RegionException::PENDING_REMOVE;
return RegionException::OK;
}

// the flow is the same as executeFetchcolumns
void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
{
Expand Down Expand Up @@ -171,40 +183,31 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
ColumnID cid = ci.column_id();

if (cid == -1)
// Column ID -1 means TiDB expects no specific column, mostly it is for cases like `select count(*)`.
// This means we can return whatever column, we'll choose it later if no other columns are specified either.
{
// Column ID -1 return the handle column
if (auto pk_handle_col = storage->getTableInfo().getPKHandleColumn())
{
required_columns.push_back(pk_handle_col->get().name);
auto pair = storage->getColumns().getPhysical(pk_handle_col->get().name);
source_columns.push_back(pair);
is_ts_column.push_back(false);
}
else
{
required_columns.push_back(MutableSupport::tidb_pk_column_name);
auto pair = storage->getColumns().getPhysical(MutableSupport::tidb_pk_column_name);
source_columns.push_back(pair);
is_ts_column.push_back(false);
}
continue;
}

String name = storage->getTableInfo().getColumnName(cid);
required_columns.push_back(name);
auto pair = storage->getColumns().getPhysical(name);
source_columns.emplace_back(std::move(pair));
is_ts_column.push_back(ci.tp() == TiDB::TypeTimestamp);
}
if (required_columns.empty())
{
// No column specified, we choose the handle column as it will be emitted by storage read anyhow.
// Set `void` column field type correspondingly for further needs, i.e. encoding results.
if (auto pk_handle_col = storage->getTableInfo().getPKHandleColumn())
{
required_columns.push_back(pk_handle_col->get().name);
auto pair = storage->getColumns().getPhysical(pk_handle_col->get().name);
source_columns.push_back(pair);
is_ts_column.push_back(false);
// For PK handle, use original column info of itself.
dag.getDAGContext().void_result_ft = columnInfoToFieldType(pk_handle_col->get());
}
else
{
required_columns.push_back(MutableSupport::tidb_pk_column_name);
auto pair = storage->getColumns().getPhysical(MutableSupport::tidb_pk_column_name);
source_columns.push_back(pair);
is_ts_column.push_back(false);
// For implicit handle, reverse get a column info.
auto column_info = reverseGetColumnInfo(pair, -1, Field());
dag.getDAGContext().void_result_ft = columnInfoToFieldType(column_info);
}
}

analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(source_columns), context);

Expand Down Expand Up @@ -262,12 +265,12 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
info.version = dag.getRegionVersion();
info.conf_version = dag.getRegionConfVersion();
auto current_region = context.getTMTContext().getKVStore()->getRegion(info.region_id);
if (!current_region || current_region->version() != dag.getRegionVersion() || current_region->confVer() != dag.getRegionConfVersion())
auto region_read_status = getRegionReadStatus(current_region);
if (region_read_status != RegionException::OK)
{
std::vector<RegionID> region_ids;
region_ids.push_back(info.region_id);
throw RegionException(std::move(region_ids),
current_region ? RegionException::RegionReadStatus::NOT_FOUND : RegionException::RegionReadStatus::VERSION_ERROR);
throw RegionException(std::move(region_ids), region_read_status);
}
if (!checkKeyRanges(dag.getKeyRanges(), table_id, storage->pkIsUInt64(), current_region->getRange()))
throw Exception("Cop request only support full range scan for given region", ErrorCodes::COP_BAD_DAG_REQUEST);
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Flash/Coprocessor/InterpreterDAG.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
#include <Interpreters/AggregateDescription.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/IInterpreter.h>
#include <Raft/RaftService.h>
#include <Storages/RegionQueryInfo.h>
#include <Storages/Transaction/RegionException.h>
#include <Storages/Transaction/TMTStorages.h>

namespace DB
Expand Down Expand Up @@ -84,6 +86,7 @@ class InterpreterDAG : public IInterpreter
AnalysisResult analyzeExpressions();
void recordProfileStreams(Pipeline & pipeline, Int32 index);
bool addTimeZoneCastAfterTS(std::vector<bool> & is_ts_column, Pipeline & pipeline);
RegionException::RegionReadStatus getRegionReadStatus(RegionPtr current_region);

private:
Context & context;
Expand Down
15 changes: 10 additions & 5 deletions tests/mutable-test/txn_dag/table_scan.test
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,16 @@
│ test1 │
└───────┘

# TiDB may push down table scan with -1 column, use keyword _void testing this case.
=> DBGInvoke dag('select _void from default.test') " --dag_planner="optree
┌─_void─┐
│ 50 │
└───────┘
# select TiDB rowid
=> DBGInvoke dag('select _tidb_rowid from default.test') " --dag_planner="optree
┌─_tidb_rowid─┐
│ 50 │
└─────────────┘

=> DBGInvoke dag('select _tidb_rowid, col_1 from default.test') " --dag_planner="optree
┌─_tidb_rowid─┬─col_1─┐
│ 50 │ test1 │
└─────────────┴───────┘

# Clean up.
=> DBGInvoke __drop_tidb_table(default, test)
Expand Down