Skip to content

Commit

Permalink
Refactor findpath (#4095)
Browse files Browse the repository at this point in the history
* optimize bfs

* optimizer allpath

* optimizer multi-shortestpath

* optimizer multi shortest path

* fix validate unit test

* add some comment

* fix error

* fix bfs error

* add comment

* delete conjunct

* add findpath unit test

* delete useless file

* delete log

* remove check

* multi thread executor

* single shortest multi thread

* add some testcases

* add gflags

* fix bfs error

* address comment
  • Loading branch information
nevermore3 authored Apr 13, 2022
1 parent c0811dc commit 072af05
Show file tree
Hide file tree
Showing 31 changed files with 2,310 additions and 3,979 deletions.
3 changes: 1 addition & 2 deletions src/graph/executor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ nebula_add_library(
query/TraverseExecutor.cpp
query/AppendVerticesExecutor.cpp
query/RollUpApplyExecutor.cpp
algo/ConjunctPathExecutor.cpp
algo/BFSShortestPathExecutor.cpp
algo/ProduceSemiShortestPathExecutor.cpp
algo/MultiShortestPathExecutor.cpp
algo/ProduceAllPathsExecutor.cpp
algo/CartesianProductExecutor.cpp
algo/SubgraphExecutor.cpp
Expand Down
10 changes: 3 additions & 7 deletions src/graph/executor/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,8 @@
#include "graph/executor/admin/ZoneExecutor.h"
#include "graph/executor/algo/BFSShortestPathExecutor.h"
#include "graph/executor/algo/CartesianProductExecutor.h"
#include "graph/executor/algo/ConjunctPathExecutor.h"
#include "graph/executor/algo/MultiShortestPathExecutor.h"
#include "graph/executor/algo/ProduceAllPathsExecutor.h"
#include "graph/executor/algo/ProduceSemiShortestPathExecutor.h"
#include "graph/executor/algo/SubgraphExecutor.h"
#include "graph/executor/logic/ArgumentExecutor.h"
#include "graph/executor/logic/LoopExecutor.h"
Expand Down Expand Up @@ -447,11 +446,8 @@ Executor *Executor::makeExecutor(QueryContext *qctx, const PlanNode *node) {
case PlanNode::Kind::kBFSShortest: {
return pool->add(new BFSShortestPathExecutor(node, qctx));
}
case PlanNode::Kind::kProduceSemiShortestPath: {
return pool->add(new ProduceSemiShortestPathExecutor(node, qctx));
}
case PlanNode::Kind::kConjunctPath: {
return pool->add(new ConjunctPathExecutor(node, qctx));
case PlanNode::Kind::kMultiShortestPath: {
return pool->add(new MultiShortestPathExecutor(node, qctx));
}
case PlanNode::Kind::kProduceAllPaths: {
return pool->add(new ProduceAllPathsExecutor(node, qctx));
Expand Down
239 changes: 202 additions & 37 deletions src/graph/executor/algo/BFSShortestPathExecutor.cpp
Original file line number Diff line number Diff line change
@@ -1,52 +1,217 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/
// Copyright (c) 2022 vesoft inc. All rights reserved.
//
// This source code is licensed under Apache 2.0 License.

#include "graph/executor/algo/BFSShortestPathExecutor.h"

#include "graph/planner/plan/Algo.h"

DECLARE_int32(num_operator_threads);
namespace nebula {
namespace graph {
folly::Future<Status> BFSShortestPathExecutor::execute() {
SCOPED_TIMER(&execTime_);
auto* bfs = asNode<BFSShortestPath>(node());
auto iter = ectx_->getResult(bfs->inputVar()).iter();
VLOG(1) << "current: " << node()->outputVar();
VLOG(1) << "input: " << bfs->inputVar();
pathNode_ = asNode<BFSShortestPath>(node());

if (step_ == 1) {
allRightEdges_.emplace_back();
auto& currentEdges = allRightEdges_.back();
auto rIter = ectx_->getResult(pathNode_->rightVidVar()).iter();
std::unordered_set<Value> rightVids;
for (; rIter->valid(); rIter->next()) {
auto& vid = rIter->getColumn(0);
if (rightVids.emplace(vid).second) {
Edge dummy;
currentEdges.emplace(vid, std::move(dummy));
}
}
}

std::vector<folly::Future<Status>> futures;
auto leftFuture = folly::via(runner(), [this]() { return buildPath(false); });
auto rightFuture = folly::via(runner(), [this]() { return buildPath(true); });
futures.emplace_back(std::move(leftFuture));
futures.emplace_back(std::move(rightFuture));

return folly::collect(futures)
.via(runner())
.thenValue([this](auto&& status) {
UNUSED(status);
return conjunctPath();
})
.thenValue([this](auto&& status) {
UNUSED(status);
step_++;
DataSet ds;
ds.colNames = pathNode_->colNames();
ds.rows.swap(currentDs_.rows);
return finish(ResultBuilder().value(Value(std::move(ds))).build());
});
}

Status BFSShortestPathExecutor::buildPath(bool reverse) {
auto iter = reverse ? ectx_->getResult(pathNode_->rightInputVar()).iter()
: ectx_->getResult(pathNode_->leftInputVar()).iter();
DCHECK(!!iter);
auto& visitedVids = reverse ? rightVisitedVids_ : leftVisitedVids_;
auto& allEdges = reverse ? allRightEdges_ : allLeftEdges_;
allEdges.emplace_back();
auto& currentEdges = allEdges.back();

auto iterSize = iter->size();
visitedVids.reserve(visitedVids.size() + iterSize);

std::unordered_set<Value> uniqueDst;
uniqueDst.reserve(iterSize);
DataSet nextStepVids;
nextStepVids.colNames = {nebula::kVid};
if (step_ == 1) {
for (; iter->valid(); iter->next()) {
auto edgeVal = iter->getEdge();
if (UNLIKELY(!edgeVal.isEdge())) {
continue;
}
auto& edge = edgeVal.getEdge();
auto dst = edge.dst;
visitedVids.emplace(edge.src);
if (uniqueDst.emplace(dst).second) {
nextStepVids.rows.emplace_back(Row({dst}));
}
currentEdges.emplace(std::move(dst), std::move(edge));
}
} else {
for (; iter->valid(); iter->next()) {
auto edgeVal = iter->getEdge();
if (UNLIKELY(!edgeVal.isEdge())) {
continue;
}
auto& edge = edgeVal.getEdge();
auto dst = edge.dst;
if (visitedVids.find(dst) != visitedVids.end()) {
continue;
}
if (uniqueDst.emplace(dst).second) {
nextStepVids.rows.emplace_back(Row({dst}));
}
currentEdges.emplace(std::move(dst), std::move(edge));
}
}
// set nextVid
const auto& nextVidVar = reverse ? pathNode_->rightVidVar() : pathNode_->leftVidVar();
ectx_->setResult(nextVidVar, ResultBuilder().value(std::move(nextStepVids)).build());

visitedVids.insert(std::make_move_iterator(uniqueDst.begin()),
std::make_move_iterator(uniqueDst.end()));
return Status::OK();
}

folly::Future<Status> BFSShortestPathExecutor::conjunctPath() {
const auto& leftEdges = allLeftEdges_.back();
const auto& preRightEdges = allRightEdges_[step_ - 1];
std::unordered_set<Value> meetVids;
bool oddStep = true;
for (const auto& edge : leftEdges) {
if (preRightEdges.find(edge.first) != preRightEdges.end()) {
meetVids.emplace(edge.first);
}
}
if (meetVids.empty() && step_ * 2 <= pathNode_->steps()) {
const auto& rightEdges = allRightEdges_.back();
for (const auto& edge : leftEdges) {
if (rightEdges.find(edge.first) != rightEdges.end()) {
meetVids.emplace(edge.first);
oddStep = false;
}
}
}
if (meetVids.empty()) {
return Status::OK();
}
size_t i = 0;
size_t totalSize = meetVids.size();
size_t batchSize = totalSize / static_cast<size_t>(FLAGS_num_operator_threads);
std::vector<Value> batchVids;
batchVids.reserve(batchSize);
std::vector<folly::Future<DataSet>> futures;
for (auto& vid : meetVids) {
batchVids.push_back(vid);
if (i == totalSize - 1 || batchVids.size() == batchSize) {
auto future = folly::via(runner(), [this, vids = std::move(batchVids), oddStep]() {
return doConjunct(vids, oddStep);
});
futures.emplace_back(std::move(future));
}
i++;
}

return folly::collect(futures).via(runner()).thenValue([this](auto&& resps) {
for (auto& resp : resps) {
currentDs_.append(std::move(resp));
}
return Status::OK();
});
}

DataSet BFSShortestPathExecutor::doConjunct(const std::vector<Value>& meetVids,
bool oddStep) const {
DataSet ds;
ds.colNames = node()->colNames();
std::unordered_multimap<Value, Value> interim;

for (; iter->valid(); iter->next()) {
auto edgeVal = iter->getEdge();
if (!edgeVal.isEdge()) {
continue;
}
auto& edge = edgeVal.getEdge();
auto visited = visited_.find(edge.dst) != visited_.end();
if (visited) {
continue;
}

// save the starts.
visited_.emplace(edge.src);
VLOG(1) << "dst: " << edge.dst << " edge: " << edge;
interim.emplace(edge.dst, std::move(edgeVal));
}
for (auto& kv : interim) {
auto dst = std::move(kv.first);
auto edge = std::move(kv.second);
Row row;
row.values.emplace_back(dst);
row.values.emplace_back(std::move(edge));
ds.rows.emplace_back(std::move(row));
visited_.emplace(dst);
}
return finish(ResultBuilder().value(Value(std::move(ds))).build());
auto leftPaths = createPath(meetVids, false, oddStep);
auto rightPaths = createPath(meetVids, true, oddStep);
for (auto& leftPath : leftPaths) {
auto range = rightPaths.equal_range(leftPath.first);
for (auto& rightPath = range.first; rightPath != range.second; ++rightPath) {
Path result = leftPath.second;
result.reverse();
result.append(rightPath->second);
Row row;
row.emplace_back(std::move(result));
ds.rows.emplace_back(std::move(row));
}
}
return ds;
}

std::unordered_multimap<Value, Path> BFSShortestPathExecutor::createPath(
std::vector<Value> meetVids, bool reverse, bool oddStep) const {
std::unordered_multimap<Value, Path> result;
auto& allEdges = reverse ? allRightEdges_ : allLeftEdges_;
for (auto& meetVid : meetVids) {
Path start;
start.src = Vertex(meetVid, {});
std::vector<Path> interimPaths = {std::move(start)};
auto iter = (reverse && oddStep) ? allEdges.rbegin() + 1 : allEdges.rbegin();
auto end = reverse ? allEdges.rend() - 1 : allEdges.rend();
if (iter == end) {
result.emplace(meetVid, std::move(start));
return result;
}
for (; iter != end; ++iter) {
std::vector<Path> temp;
for (auto& interimPath : interimPaths) {
Value id;
if (interimPath.steps.empty()) {
id = interimPath.src.vid;
} else {
id = interimPath.steps.back().dst.vid;
}
auto range = iter->equal_range(id);
for (auto edgeIter = range.first; edgeIter != range.second; ++edgeIter) {
Path p = interimPath;
auto& edge = edgeIter->second;
p.steps.emplace_back(Step(Vertex(edge.src, {}), -edge.type, edge.name, edge.ranking, {}));

if (iter == end - 1) {
result.emplace(p.src.vid, std::move(p));
} else {
temp.emplace_back(std::move(p));
}
} // edgeIter
} // interimPath
interimPaths = std::move(temp);
}
}
return result;
}

} // namespace graph
} // namespace nebula
59 changes: 53 additions & 6 deletions src/graph/executor/algo/BFSShortestPathExecutor.h
Original file line number Diff line number Diff line change
@@ -1,15 +1,45 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/
// Copyright (c) 2022 vesoft inc. All rights reserved.
//
// This source code is licensed under Apache 2.0 License.

#ifndef GRAPH_EXECUTOR_ALGO_BFSSHORTESTPATHEXECUTOR_H_
#define GRAPH_EXECUTOR_ALGO_BFSSHORTESTPATHEXECUTOR_H_

#include "graph/executor/Executor.h"

// BFSShortestPath has two inputs. GetNeighbors(From) & GetNeighbors(To)
// There are two Main functions
// First : Get the next vid for GetNeighbors to expand
// Second: Extract edges from GetNeighbors to form path, concatenate the path(From) and the path(To)
// into a complete path
//
//
// Functions:
// `buildPath`: extract edges from GetNeighbors put it into allLeftEdges or allRightEdges
// and set the vid that needs to be expanded in the next step
//
// `conjunctPath`: concatenate the path(From) and the path(To) into a complete path
// allLeftEdges needs to match the previous step of the allRightEdges
// then current step of the allRightEdges each time
// Eg. a->b->c->d
// firstStep: allLeftEdges [<b, a->b>] allRightEdges [<c, d<-c>], can't find common vid
// secondStep: allLeftEdges [<b, a->b>, <c, b->c>] allRightEdges [<b, c<-b>, <c, d<-c>]
// we should use allLeftEdges(secondStep) to match allRightEdges(firstStep) first
// if find common vid, no need to match allRightEdges(secondStep)
//
// Member:
// `allLeftEdges_` : is a array, each element in the array is a hashTable
// hash table
// KEY : the VID of the vertex
// VALUE : edges visited at the current step (the destination is KEY)
//
// `allRightEdges_` : same as allLeftEdges_
//
// `leftVisitedVids_` : keep already visited vid to avoid repeated visits (left)
// `rightVisitedVids_` : keep already visited vid to avoid repeated visits (right)
// `currentDs_`: keep the paths matched in current step
namespace nebula {
namespace graph {
class BFSShortestPath;
class BFSShortestPathExecutor final : public Executor {
public:
BFSShortestPathExecutor(const PlanNode* node, QueryContext* qctx)
Expand All @@ -18,7 +48,24 @@ class BFSShortestPathExecutor final : public Executor {
folly::Future<Status> execute() override;

private:
std::unordered_set<Value> visited_;
Status buildPath(bool reverse);

folly::Future<Status> conjunctPath();

DataSet doConjunct(const std::vector<Value>& meetVids, bool oddStep) const;

std::unordered_multimap<Value, Path> createPath(std::vector<Value> meetVids,
bool reverse,
bool oddStep) const;

private:
const BFSShortestPath* pathNode_{nullptr};
size_t step_{1};
std::unordered_set<Value> leftVisitedVids_;
std::unordered_set<Value> rightVisitedVids_;
std::vector<std::unordered_multimap<Value, Edge>> allLeftEdges_;
std::vector<std::unordered_multimap<Value, Edge>> allRightEdges_;
DataSet currentDs_;
};
} // namespace graph
} // namespace nebula
Expand Down
Loading

0 comments on commit 072af05

Please sign in to comment.