Skip to content

Commit

Permalink
Fix the bug that storage returns truncated string(n) index
Browse files Browse the repository at this point in the history
  • Loading branch information
cangfengzhs committed Sep 10, 2021
1 parent 3e6a7f8 commit f881c1f
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 30 deletions.
90 changes: 68 additions & 22 deletions src/storage/exec/IndexOutputNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,33 +36,40 @@ class IndexOutputNode final : public RelNode<T> {
RuntimeContext* context,
IndexScanNode<T>* indexScanNode,
bool hasNullableCol,
const std::vector<meta::cpp2::ColumnDef>& fields)
const std::vector<meta::cpp2::ColumnDef>& fields,
const std::vector<std::shared_ptr<const meta::NebulaSchemaProvider>>& schemas)
: result_(result),
context_(context),
indexScanNode_(indexScanNode),
hasNullableCol_(hasNullableCol),
fields_(fields) {
fields_(fields),
schemas_(schemas) {
type_ = context_->isEdge() ? IndexResultType::kEdgeFromIndexScan
: IndexResultType::kVertexFromIndexScan;
}

IndexOutputNode(nebula::DataSet* result, RuntimeContext* context, IndexEdgeNode<T>* indexEdgeNode)
: result_(result), context_(context), indexEdgeNode_(indexEdgeNode) {
IndexOutputNode(nebula::DataSet* result,
RuntimeContext* context,
IndexEdgeNode<T>* indexEdgeNode,
const std::vector<std::shared_ptr<const meta::NebulaSchemaProvider>>& schemas)
: result_(result), context_(context), indexEdgeNode_(indexEdgeNode), schemas_(schemas) {
type_ = IndexResultType::kEdgeFromDataScan;
}

IndexOutputNode(nebula::DataSet* result,
RuntimeContext* context,
IndexVertexNode<T>* indexVertexNode)
: result_(result), context_(context), indexVertexNode_(indexVertexNode) {
IndexVertexNode<T>* indexVertexNode,
const std::vector<std::shared_ptr<const meta::NebulaSchemaProvider>>& schemas)
: result_(result), context_(context), indexVertexNode_(indexVertexNode), schemas_(schemas) {
type_ = IndexResultType::kVertexFromDataScan;
}

IndexOutputNode(nebula::DataSet* result,
RuntimeContext* context,
IndexFilterNode<T>* indexFilterNode,
const std::vector<std::shared_ptr<const meta::NebulaSchemaProvider>>& schemas,
bool indexFilter = false)
: result_(result), context_(context), indexFilterNode_(indexFilterNode) {
: result_(result), context_(context), indexFilterNode_(indexFilterNode), schemas_(schemas) {
hasNullableCol_ = indexFilterNode->hasNullableCol();
fields_ = indexFilterNode_->indexCols();
if (indexFilter) {
Expand All @@ -79,6 +86,7 @@ class IndexOutputNode final : public RelNode<T> {
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
}
partId_ = partId;

switch (type_) {
case IndexResultType::kEdgeFromIndexScan: {
Expand Down Expand Up @@ -124,8 +132,7 @@ class IndexOutputNode final : public RelNode<T> {
}
auto ret = nebula::cpp2::ErrorCode::SUCCEEDED;
switch (type_) {
case IndexResultType::kEdgeFromIndexScan:
case IndexResultType::kEdgeFromIndexFilter: {
case IndexResultType::kEdgeFromIndexScan: {
ret = edgeRowsFromIndex(data);
break;
}
Expand All @@ -134,8 +141,7 @@ class IndexOutputNode final : public RelNode<T> {
ret = edgeRowsFromData(data);
break;
}
case IndexResultType::kVertexFromIndexScan:
case IndexResultType::kVertexFromIndexFilter: {
case IndexResultType::kVertexFromIndexScan: {
ret = vertexRowsFromIndex(data);
break;
}
Expand All @@ -144,26 +150,22 @@ class IndexOutputNode final : public RelNode<T> {
ret = vertexRowsFromData(data);
break;
}
default:
LOG(FATAL) << "Unexpect IndexResultType:" << int(type_);
}
return ret;
}

nebula::cpp2::ErrorCode vertexRowsFromData(const std::vector<kvstore::KV>& data) {
const auto& schemas = type_ == IndexResultType::kVertexFromDataScan
? indexVertexNode_->getSchemas()
: indexFilterNode_->getSchemas();
if (schemas.empty()) {
return nebula::cpp2::ErrorCode::E_TAG_NOT_FOUND;
}
for (const auto& val : data) {
Row row;
auto reader = RowReaderWrapper::getRowReader(schemas, val.second);
auto reader = RowReaderWrapper::getRowReader(schemas_, val.second);
if (!reader) {
VLOG(1) << "Can't get tag reader";
return nebula::cpp2::ErrorCode::E_TAG_NOT_FOUND;
}
for (const auto& col : result_->colNames) {
auto ret = addIndexValue(row, reader.get(), val, col, schemas.back().get());
auto ret = addIndexValue(row, reader.get(), val, col, schemas_.back().get());
if (!ret.ok()) {
return nebula::cpp2::ErrorCode::E_INVALID_DATA;
}
Expand All @@ -177,7 +179,7 @@ class IndexOutputNode final : public RelNode<T> {
for (const auto& val : data) {
Row row;
for (const auto& col : result_->colNames) {
auto ret = addIndexValue(row, val, col);
auto ret = addIndexValue(row, val, col, schemas_.back().get());
if (!ret.ok()) {
return nebula::cpp2::ErrorCode::E_INVALID_DATA;
}
Expand Down Expand Up @@ -216,7 +218,7 @@ class IndexOutputNode final : public RelNode<T> {
for (const auto& val : data) {
Row row;
for (const auto& col : result_->colNames) {
auto ret = addIndexValue(row, val, col);
auto ret = addIndexValue(row, val, col, schemas_.back().get());
if (!ret.ok()) {
return nebula::cpp2::ErrorCode::E_INVALID_DATA;
}
Expand Down Expand Up @@ -285,7 +287,10 @@ class IndexOutputNode final : public RelNode<T> {
}

// Add the value by index key
Status addIndexValue(Row& row, const kvstore::KV& data, const std::string& col) {
Status addIndexValue(Row& row,
const kvstore::KV& data,
const std::string& col,
const meta::NebulaSchemaProvider* schema) {
switch (QueryUtils::toReturnColType(col)) {
case QueryUtils::ReturnColType::kVid: {
auto vId = IndexKeyUtils::getIndexVertexID(context_->vIdLen(), data.first);
Expand Down Expand Up @@ -329,6 +334,45 @@ class IndexOutputNode final : public RelNode<T> {
default: {
auto v = IndexKeyUtils::getValueFromIndexKey(
context_->vIdLen(), data.first, col, fields_, context_->isEdge(), hasNullableCol_);
if (v.isStr()) {
auto iter = std::find_if(fields_.begin(), fields_.end(), [&col](const auto& field) {
return col == field.get_name();
});
if (iter == fields_.end()) {
return Status::Error("Bad data");
}
if (static_cast<int>(v.toString().size()) >=
iter->get_type().type_length_ref().value_or(std::numeric_limits<int16_t>::max())) {
RowReaderWrapper reader;
std::string val;
if (context_->isEdge()) {
auto srcId = NebulaKeyUtils::getSrcId(context_->vIdLen(), data.first);
auto dstId = NebulaKeyUtils::getDstId(context_->vIdLen(), data.first);
auto rank = NebulaKeyUtils::getRank(context_->vIdLen(), data.first);
auto key = NebulaKeyUtils::edgeKey(context_->vIdLen(),
partId_,
srcId.toString(),
context_->edgeType_,
rank,
dstId.toString());
auto ret = context_->env()->kvstore_->get(context_->spaceId(), partId_, key, &val);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return Status::Error("Bad data");
}
reader.reset(schema, val);
} else {
auto vId = IndexKeyUtils::getIndexVertexID(context_->vIdLen(), data.first);
auto key = NebulaKeyUtils::vertexKey(
context_->vIdLen(), partId_, vId.toString(), context_->tagId_);
auto ret = context_->env()->kvstore_->get(context_->spaceId(), partId_, key, &val);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return Status::Error("Bad data");
}
reader.reset(schema, val);
}
v = QueryUtils::readValue(&reader, col, schema).value();
}
}
row.emplace_back(std::move(v));
}
}
Expand All @@ -345,6 +389,8 @@ class IndexOutputNode final : public RelNode<T> {
IndexFilterNode<T>* indexFilterNode_{nullptr};
bool hasNullableCol_{};
std::vector<meta::cpp2::ColumnDef> fields_;
PartitionID partId_;
const std::vector<std::shared_ptr<const meta::NebulaSchemaProvider>>& schemas_;
};

} // namespace storage
Expand Down
20 changes: 13 additions & 7 deletions src/storage/index/LookupBaseProcessor-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ StatusOr<StoragePlan<IndexID>> LookupBaseProcessor<REQ, RESP>::buildPlan(
out = buildPlanBasic(result, ctx, plan, hasNullableCol, fields);
}
} else {
DLOG(INFO) << "buildPlanWithDataAndFilter";

auto expr = Expression::decode(pool, ctx.get_filter());
// Need to get columns in data, expr ctx need to be aware of schema
const auto& schemaName = context_->isEdge() ? context_->edgeName_ : context_->tagName_;
Expand Down Expand Up @@ -272,7 +274,7 @@ std::unique_ptr<IndexOutputNode<IndexID>> LookupBaseProcessor<REQ, RESP>::buildP
std::make_unique<IndexScanNode<IndexID>>(context_.get(), indexId, std::move(colHints));

auto output = std::make_unique<IndexOutputNode<IndexID>>(
result, context_.get(), indexScan.get(), hasNullableCol, fields);
result, context_.get(), indexScan.get(), hasNullableCol, fields, schemas_);
output->addDependency(indexScan.get());
plan.addNode(std::move(indexScan));
return output;
Expand Down Expand Up @@ -308,7 +310,8 @@ std::unique_ptr<IndexOutputNode<IndexID>> LookupBaseProcessor<REQ, RESP>::buildP
auto edge = std::make_unique<IndexEdgeNode<IndexID>>(
context_.get(), indexScan.get(), schemas_, context_->edgeName_);
edge->addDependency(indexScan.get());
auto output = std::make_unique<IndexOutputNode<IndexID>>(result, context_.get(), edge.get());
auto output =
std::make_unique<IndexOutputNode<IndexID>>(result, context_.get(), edge.get(), schemas_);
output->addDependency(edge.get());
plan.addNode(std::move(indexScan));
plan.addNode(std::move(edge));
Expand All @@ -317,7 +320,8 @@ std::unique_ptr<IndexOutputNode<IndexID>> LookupBaseProcessor<REQ, RESP>::buildP
auto vertex = std::make_unique<IndexVertexNode<IndexID>>(
context_.get(), indexScan.get(), schemas_, context_->tagName_);
vertex->addDependency(indexScan.get());
auto output = std::make_unique<IndexOutputNode<IndexID>>(result, context_.get(), vertex.get());
auto output =
std::make_unique<IndexOutputNode<IndexID>>(result, context_.get(), vertex.get(), schemas_);
output->addDependency(vertex.get());
plan.addNode(std::move(indexScan));
plan.addNode(std::move(vertex));
Expand Down Expand Up @@ -359,8 +363,8 @@ std::unique_ptr<IndexOutputNode<IndexID>> LookupBaseProcessor<REQ, RESP>::buildP
auto filter = std::make_unique<IndexFilterNode<IndexID>>(
context_.get(), indexScan.get(), exprCtx, exp, context_->isEdge());
filter->addDependency(indexScan.get());
auto output =
std::make_unique<IndexOutputNode<IndexID>>(result, context_.get(), filter.get(), true);
auto output = std::make_unique<IndexOutputNode<IndexID>>(
result, context_.get(), filter.get(), schemas_, true);
output->addDependency(filter.get());
plan.addNode(std::move(indexScan));
plan.addNode(std::move(filter));
Expand Down Expand Up @@ -414,7 +418,8 @@ LookupBaseProcessor<REQ, RESP>::buildPlanWithDataAndFilter(nebula::DataSet* resu
std::make_unique<IndexFilterNode<IndexID>>(context_.get(), edge.get(), exprCtx, exp);
filter->addDependency(edge.get());

auto output = std::make_unique<IndexOutputNode<IndexID>>(result, context_.get(), filter.get());
auto output =
std::make_unique<IndexOutputNode<IndexID>>(result, context_.get(), filter.get(), schemas_);
output->addDependency(filter.get());
plan.addNode(std::move(indexScan));
plan.addNode(std::move(edge));
Expand All @@ -428,7 +433,8 @@ LookupBaseProcessor<REQ, RESP>::buildPlanWithDataAndFilter(nebula::DataSet* resu
std::make_unique<IndexFilterNode<IndexID>>(context_.get(), vertex.get(), exprCtx, exp);
filter->addDependency(vertex.get());

auto output = std::make_unique<IndexOutputNode<IndexID>>(result, context_.get(), filter.get());
auto output =
std::make_unique<IndexOutputNode<IndexID>>(result, context_.get(), filter.get(), schemas_);
output->addDependency(filter.get());
plan.addNode(std::move(indexScan));
plan.addNode(std::move(vertex));
Expand Down
3 changes: 2 additions & 1 deletion src/storage/test/IndexWithTTLTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,8 @@ TEST(IndexWithTTLTest, LookupTagIndexWithTTLExpired) {
req.set_parts(std::move(parts));
std::vector<std::string> returnCols;
returnCols.emplace_back(kSrc);
returnCols.emplace_back(kTag);
returnCols.emplace_back(kRank);
returnCols.emplace_back(kDst);
req.set_return_columns(std::move(returnCols));
auto expr = RelationalExpression::makeNE(pool,
TagPropertyExpression::make(pool, "2021001", "c1"),
Expand Down
16 changes: 16 additions & 0 deletions src/storage/test/LookupIndexTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,10 @@ TEST_P(LookupIndexTest, TagIndexFilterTest) {
* +----------+-----------+
* |
* +----------+-----------+
* + IndexVertexNode +
* +----------+-----------+
* |
* +----------+-----------+
* + IndexScanNode +
* +----------+-----------+
**/
Expand Down Expand Up @@ -671,6 +675,10 @@ TEST_P(LookupIndexTest, TagIndexFilterTest) {
* +----------+-----------+
* |
* +----------+-----------+
* + IndexVertexNode +
* +----------+-----------+
* |
* +----------+-----------+
* + IndexScanNode +
* +----------+-----------+
**/
Expand Down Expand Up @@ -752,6 +760,10 @@ TEST_P(LookupIndexTest, EdgeIndexFilterTest) {
* +----------+-----------+
* |
* +----------+-----------+
* + IndexEdgeNode +
* +----------+-----------+
* |
* +----------+-----------+
* + IndexScanNode +
* +----------+-----------+
**/
Expand Down Expand Up @@ -845,6 +857,10 @@ TEST_P(LookupIndexTest, EdgeIndexFilterTest) {
* +----------+-----------+
* |
* +----------+-----------+
* + IndexEdgeNode +
* +----------+-----------+
* |
* +----------+-----------+
* + IndexScanNode +
* +----------+-----------+
**/
Expand Down

0 comments on commit f881c1f

Please sign in to comment.