Skip to content

Commit

Permalink
all checked
Browse files Browse the repository at this point in the history
  • Loading branch information
codesigner committed Jan 4, 2023
1 parent a7326df commit b97c1ae
Show file tree
Hide file tree
Showing 23 changed files with 229 additions and 253 deletions.
7 changes: 5 additions & 2 deletions src/common/datatypes/DataSetOps-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ inline constexpr protocol::TType Cpp2Ops<nebula::DataSet>::thriftType() {

template <class Protocol>
uint32_t Cpp2Ops<nebula::DataSet>::write(Protocol* proto, nebula::DataSet const* obj) {
// we do not turn on memory tracker here, when the DataSet object is creating & inserting, it is
// in Processor::process(), where memory tracker is turned on. so we think that is enough.
uint32_t xfer = 0;

xfer += proto->writeStructBegin("DataSet");
Expand All @@ -73,8 +75,9 @@ void Cpp2Ops<nebula::DataSet>::read(Protocol* proto, nebula::DataSet* obj) {
// memory usage during decode a StorageResponse should be mostly occupied
// by DataSet (see interface/storage.thrift), turn on memory check here.
//
// MemoryTrackerVerified: throw std::bad_alloc has verified, can be captured in
// StorageClientBase::getResponse's onError
// MemoryTrackerVerified:
// throw std::bad_alloc has verified, can be captured in StorageClientBase::getResponse's
// onError
nebula::memory::MemoryCheckGuard guard;

apache::thrift::detail::ProtocolReaderStructReadState<Protocol> readState;
Expand Down
4 changes: 4 additions & 0 deletions src/common/memory/MemoryTracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ void MemoryTracker::free(int64_t size) {
MemoryStats::instance().free(size);
}

bool MemoryTracker::isOn() {
return MemoryStats::instance().throwOnMemoryExceeded();
}

void MemoryTracker::allocImpl(int64_t size, bool) {
MemoryStats::instance().alloc(size);
}
Expand Down
3 changes: 3 additions & 0 deletions src/common/memory/MemoryTracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ struct MemoryTracker {
/// This function should be called after memory deallocation.
static void free(int64_t size);

/// Test state of memory tracker, return true if memory tracker is turned on, otherwise false.
static bool isOn();

private:
static void allocImpl(int64_t size, bool throw_if_memory_exceeded);
};
Expand Down
3 changes: 3 additions & 0 deletions src/graph/executor/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,9 @@ Status Executor::open() {
}

Status Executor::close() {
// MemoryTrackerVerified
DCHECK(memory::MemoryTracker::isOn()) << "MemoryTracker is off";

ProfilingStats stats;
stats.totalDurationInUs = totalDuration_.elapsedInUSec();
stats.rows = numRows_;
Expand Down
10 changes: 4 additions & 6 deletions src/graph/executor/Executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ class Executor : private boost::noncopyable, private cpp::NonMovable {
folly::Future<Status> error(Status status) const;

static Status memoryExceededStatus() {
return Status::Error("Graph Error: GRAPH_MEMORY_EXCEEDED(%d)",
static_cast<int32_t>(nebula::cpp2::ErrorCode::E_GRAPH_MEMORY_EXCEEDED));
return Status::GraphMemoryExceeded(
"(%d)", static_cast<int32_t>(nebula::cpp2::ErrorCode::E_GRAPH_MEMORY_EXCEEDED));
}

protected:
Expand Down Expand Up @@ -159,8 +159,8 @@ auto Executor::runMultiJobs(ScatterFunc &&scatter, GatherFunc &&gather, Iterator
futures.emplace_back(folly::via(
runner(),
[begin, end, tmpIter = iter->copy(), f = std::move(scatter)]() mutable -> ScatterResult {
// MemoryTrackerVerified
memory::MemoryCheckGuard guard;
// MemoryCheckGuard verified
// Since not all iterators are linear, so iterates to the begin pos
size_t tmp = 0;
for (; tmpIter->valid() && tmp < begin; ++tmp) {
Expand All @@ -174,9 +174,7 @@ auto Executor::runMultiJobs(ScatterFunc &&scatter, GatherFunc &&gather, Iterator
}

// Gather all results and do post works
return folly::collect(futures)
.via(runner())
.thenValue(std::move(gather));
return folly::collect(futures).via(runner()).thenValue(std::move(gather));
}
} // namespace graph
} // namespace nebula
Expand Down
11 changes: 5 additions & 6 deletions src/graph/executor/algo/BFSShortestPathExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ DECLARE_int32(num_operator_threads);
namespace nebula {
namespace graph {
folly::Future<Status> BFSShortestPathExecutor::execute() {
DCHECK(memory::MemoryTracker::isOn()) << "MemoryTracker is off";

SCOPED_TIMER(&execTime_);
pathNode_ = asNode<BFSShortestPath>(node());
terminateEarlyVar_ = pathNode_->terminateEarlyVar();
Expand All @@ -30,10 +32,12 @@ folly::Future<Status> BFSShortestPathExecutor::execute() {

std::vector<folly::Future<Status>> futures;
auto leftFuture = folly::via(runner(), [this]() {
// MemoryTrackerVerified
memory::MemoryCheckGuard guard;
return buildPath(false);
});
auto rightFuture = folly::via(runner(), [this]() {
// MemoryTrackerVerified
memory::MemoryCheckGuard guard;
return buildPath(true);
});
Expand Down Expand Up @@ -111,6 +115,7 @@ Status BFSShortestPathExecutor::buildPath(bool reverse) {
currentEdges.emplace(std::move(dst), std::move(edge));
}
}

// set nextVid
const auto& nextVidVar = reverse ? pathNode_->rightVidVar() : pathNode_->leftVidVar();
ectx_->setResult(nextVidVar, ResultBuilder().value(std::move(nextStepVids)).build());
Expand Down Expand Up @@ -170,12 +175,6 @@ folly::Future<Status> BFSShortestPathExecutor::conjunctPath() {
currentDs_.append(std::move(resp));
}
return Status::OK();
})
.thenError(
folly::tag_t<std::bad_alloc>{},
[](const std::bad_alloc&) { return folly::makeFuture<Status>(memoryExceededStatus()); })
.thenError(folly::tag_t<std::exception>{}, [](const std::exception& e) {
return folly::makeFuture<Status>(std::runtime_error(e.what()));
});
}

Expand Down
172 changes: 82 additions & 90 deletions src/graph/executor/algo/BatchShortestPath.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,28 @@ namespace graph {
folly::Future<Status> BatchShortestPath::execute(const HashSet& startVids,
const HashSet& endVids,
DataSet* result) {
// MemoryTrackerVerified
DCHECK(memory::MemoryTracker::isOn()) << "MemoryTracker is off";

size_t rowSize = init(startVids, endVids);
std::vector<folly::Future<Status>> futures;
futures.reserve(rowSize);
for (size_t rowNum = 0; rowNum < rowSize; ++rowNum) {
resultDs_[rowNum].colNames = pathNode_->colNames();
futures.emplace_back(shortestPath(rowNum, 1));
}
return folly::collect(futures)
.via(qctx_->rctx()->runner())
.thenValue([this, result](auto&& resps) {
memory::MemoryCheckGuard guard;
for (auto& resp : resps) {
NG_RETURN_IF_ERROR(resp);
}
result->colNames = pathNode_->colNames();
for (auto& ds : resultDs_) {
result->append(std::move(ds));
}
return Status::OK();
})
.thenError(folly::tag_t<std::bad_alloc>{},
[](const std::bad_alloc&) {
return folly::makeFuture<Status>(Executor::memoryExceededStatus());
})
.thenError(folly::tag_t<std::exception>{}, [](const std::exception& e) {
return folly::makeFuture<Status>(std::runtime_error(e.what()));
});
return folly::collect(futures).via(runner()).thenValue([this, result](auto&& resps) {
// MemoryTrackerVerified
memory::MemoryCheckGuard guard;
for (auto& resp : resps) {
NG_RETURN_IF_ERROR(resp);
}
result->colNames = pathNode_->colNames();
for (auto& ds : resultDs_) {
result->append(std::move(ds));
}
return Status::OK();
});
}

size_t BatchShortestPath::init(const HashSet& startVids, const HashSet& endVids) {
Expand Down Expand Up @@ -106,8 +101,9 @@ folly::Future<Status> BatchShortestPath::shortestPath(size_t rowNum, size_t step
futures.emplace_back(getNeighbors(rowNum, stepNum, false));
futures.emplace_back(getNeighbors(rowNum, stepNum, true));
return folly::collect(futures)
.via(qctx_->rctx()->runner())
.via(runner())
.thenValue([this, rowNum, stepNum](auto&& resps) {
// MemoryTrackerVerified
memory::MemoryCheckGuard guard;
for (auto& resp : resps) {
if (!resp.ok()) {
Expand All @@ -116,6 +112,10 @@ folly::Future<Status> BatchShortestPath::shortestPath(size_t rowNum, size_t step
}
return handleResponse(rowNum, stepNum);
})
// This thenError is necessary to catch bad_alloc, seems the returned future
// is related to two routines: getNeighbors, handleResponse, each of them launch some task in
// separate thread, if any one of routine throw bad_alloc, fail the query, will cause another
// to run on a maybe already released BatchShortestPath object
.thenError(folly::tag_t<std::bad_alloc>{},
[](const std::bad_alloc&) {
return folly::makeFuture<Status>(Executor::memoryExceededStatus());
Expand Down Expand Up @@ -151,18 +151,12 @@ folly::Future<Status> BatchShortestPath::getNeighbors(size_t rowNum, size_t step
-1,
nullptr,
nullptr)
.via(qctx_->rctx()->runner())
.via(runner())
.thenValue([this, rowNum, reverse, stepNum, getNbrTime](auto&& resp) {
// MemoryTrackerVerified
memory::MemoryCheckGuard guard;
addStats(resp, stepNum, getNbrTime.elapsedInUSec(), reverse);
return buildPath(rowNum, std::move(resp), reverse);
})
.thenError(folly::tag_t<std::bad_alloc>{},
[](const std::bad_alloc&) {
return folly::makeFuture<Status>(Executor::memoryExceededStatus());
})
.thenError(folly::tag_t<std::exception>{}, [](const std::exception& e) {
return folly::makeFuture<Status>(std::runtime_error(e.what()));
});
}

Expand Down Expand Up @@ -281,21 +275,29 @@ Status BatchShortestPath::doBuildPath(size_t rowNum, GetNeighborsIter* iter, boo

folly::Future<Status> BatchShortestPath::handleResponse(size_t rowNum, size_t stepNum) {
return folly::makeFuture(Status::OK())
.via(qctx_->rctx()->runner())
.via(runner())
.thenValue([this, rowNum](auto&& status) {
// MemoryTrackerVerified
memory::MemoryCheckGuard guard;

// odd step
UNUSED(status);
return conjunctPath(rowNum, true);
})
.thenValue([this, rowNum, stepNum](auto&& terminate) {
// MemoryTrackerVerified
memory::MemoryCheckGuard guard;

// even Step
if (terminate || stepNum * 2 > maxStep_) {
return folly::makeFuture<bool>(true);
}
return conjunctPath(rowNum, false);
})
.thenValue([this, rowNum, stepNum](auto&& result) {
// MemoryTrackerVerified
memory::MemoryCheckGuard guard;

if (result || stepNum * 2 >= maxStep_) {
return folly::makeFuture<Status>(Status::OK());
}
Expand All @@ -319,13 +321,6 @@ folly::Future<Status> BatchShortestPath::handleResponse(size_t rowNum, size_t st
leftPathMap.clear();
rightPathMap.clear();
return shortestPath(rowNum, stepNum + 1);
})
.thenError(folly::tag_t<std::bad_alloc>{},
[](const std::bad_alloc&) {
return folly::makeFuture<Status>(Executor::memoryExceededStatus());
})
.thenError(folly::tag_t<std::exception>{}, [](const std::exception& e) {
return folly::makeFuture<Status>(std::runtime_error(e.what()));
});
}

Expand Down Expand Up @@ -379,64 +374,61 @@ folly::Future<bool> BatchShortestPath::conjunctPath(size_t rowNum, bool oddStep)
}

auto future = getMeetVids(rowNum, oddStep, meetVids);
return future.via(qctx_->rctx()->runner())
.thenValue([this, rowNum, oddStep](auto&& vertices) {
memory::MemoryCheckGuard guard;
if (vertices.empty()) {
return false;
}
robin_hood::unordered_flat_map<Value, Value, std::hash<Value>> verticesMap;
for (auto& vertex : vertices) {
verticesMap[vertex.getVertex().vid] = std::move(vertex);
return future.via(runner()).thenValue([this, rowNum, oddStep](auto&& vertices) {
// MemoryTrackerVerified
memory::MemoryCheckGuard guard;

if (vertices.empty()) {
return false;
}
robin_hood::unordered_flat_map<Value, Value, std::hash<Value>> verticesMap;
for (auto& vertex : vertices) {
verticesMap[vertex.getVertex().vid] = std::move(vertex);
}
auto& terminationMap = terminationMaps_[rowNum];
auto& leftPathMaps = currentLeftPathMaps_[rowNum];
auto& rightPathMaps = oddStep ? preRightPathMaps_[rowNum] : currentRightPathMaps_[rowNum];
for (const auto& leftPathMap : leftPathMaps) {
auto findCommonVid = rightPathMaps.find(leftPathMap.first);
if (findCommonVid == rightPathMaps.end()) {
continue;
}
auto findCommonVertex = verticesMap.find(findCommonVid->first);
if (findCommonVertex == verticesMap.end()) {
continue;
}
auto& rightPaths = findCommonVid->second;
for (const auto& srcPaths : leftPathMap.second) {
auto range = terminationMap.equal_range(srcPaths.first);
if (range.first == range.second) {
continue;
}
auto& terminationMap = terminationMaps_[rowNum];
auto& leftPathMaps = currentLeftPathMaps_[rowNum];
auto& rightPathMaps = oddStep ? preRightPathMaps_[rowNum] : currentRightPathMaps_[rowNum];
for (const auto& leftPathMap : leftPathMaps) {
auto findCommonVid = rightPathMaps.find(leftPathMap.first);
if (findCommonVid == rightPathMaps.end()) {
continue;
}
auto findCommonVertex = verticesMap.find(findCommonVid->first);
if (findCommonVertex == verticesMap.end()) {
continue;
}
auto& rightPaths = findCommonVid->second;
for (const auto& srcPaths : leftPathMap.second) {
auto range = terminationMap.equal_range(srcPaths.first);
if (range.first == range.second) {
continue;
}
for (const auto& dstPaths : rightPaths) {
for (auto found = range.first; found != range.second; ++found) {
if (found->second.first == dstPaths.first) {
if (singleShortest_ && !found->second.second) {
break;
}
doConjunctPath(
srcPaths.second, dstPaths.second, findCommonVertex->second, rowNum);
found->second.second = false;
}
for (const auto& dstPaths : rightPaths) {
for (auto found = range.first; found != range.second; ++found) {
if (found->second.first == dstPaths.first) {
if (singleShortest_ && !found->second.second) {
break;
}
doConjunctPath(srcPaths.second, dstPaths.second, findCommonVertex->second, rowNum);
found->second.second = false;
}
}
}
// update terminationMap
for (auto iter = terminationMap.begin(); iter != terminationMap.end();) {
if (!iter->second.second) {
iter = terminationMap.erase(iter);
} else {
++iter;
}
}
if (terminationMap.empty()) {
return true;
}
return false;
})
.thenError(folly::tag_t<std::exception>{}, [](const std::exception& e) {
return folly::makeFuture<bool>(std::runtime_error(e.what()));
});
}
}
// update terminationMap
for (auto iter = terminationMap.begin(); iter != terminationMap.end();) {
if (!iter->second.second) {
iter = terminationMap.erase(iter);
} else {
++iter;
}
}
if (terminationMap.empty()) {
return true;
}
return false;
});
}

void BatchShortestPath::doConjunctPath(const std::vector<CustomPath>& leftPaths,
Expand Down
Loading

0 comments on commit b97c1ae

Please sign in to comment.