From a3d5543318d043bc4d40e001a1b2c03fc76b4646 Mon Sep 17 00:00:00 2001 From: jimingquan Date: Sun, 10 Apr 2022 23:02:45 +0800 Subject: [PATCH] add gflags --- src/graph/executor/algo/BFSShortestPathExecutor.cpp | 4 ++-- src/graph/executor/algo/MultiShortestPathExecutor.cpp | 5 ++--- src/graph/executor/algo/ProduceAllPathsExecutor.cpp | 5 ++--- src/graph/service/GraphFlags.cpp | 1 + src/graph/service/GraphFlags.h | 1 + 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/graph/executor/algo/BFSShortestPathExecutor.cpp b/src/graph/executor/algo/BFSShortestPathExecutor.cpp index d25edec2171..79f5439d81c 100644 --- a/src/graph/executor/algo/BFSShortestPathExecutor.cpp +++ b/src/graph/executor/algo/BFSShortestPathExecutor.cpp @@ -6,6 +6,7 @@ #include "graph/planner/plan/Algo.h" +DECLARE_int32(num_operator_threads); namespace nebula { namespace graph { folly::Future BFSShortestPathExecutor::execute() { @@ -126,9 +127,8 @@ folly::Future BFSShortestPathExecutor::conjunctPath() { if (meetVids.empty()) { return Status::OK(); } - static size_t NUM_PROC = 5; size_t totalSize = meetVids.size(); - size_t batchSize = totalSize / NUM_PROC; + size_t batchSize = totalSize / static_cast(FLAGS_num_operator_threads); std::vector batchVids; batchVids.reserve(batchSize); std::vector> futures; diff --git a/src/graph/executor/algo/MultiShortestPathExecutor.cpp b/src/graph/executor/algo/MultiShortestPathExecutor.cpp index 6d403744a7d..5ab58fa4847 100644 --- a/src/graph/executor/algo/MultiShortestPathExecutor.cpp +++ b/src/graph/executor/algo/MultiShortestPathExecutor.cpp @@ -4,7 +4,7 @@ #include "graph/executor/algo/MultiShortestPathExecutor.h" #include "graph/planner/plan/Algo.h" - +DECLARE_int32(num_operator_threads); namespace nebula { namespace graph { folly::Future MultiShortestPathExecutor::execute() { @@ -161,8 +161,7 @@ DataSet MultiShortestPathExecutor::doConjunct(Interims::iterator startIter, } folly::Future MultiShortestPathExecutor::conjunctPath(bool oddStep) { - static size_t NUM_PROC = 5; - size_t batchSize = leftPaths_.size() / NUM_PROC; + size_t batchSize = leftPaths_.size() / static_cast(FLAGS_num_operator_threads); std::vector> futures; size_t i = 0; diff --git a/src/graph/executor/algo/ProduceAllPathsExecutor.cpp b/src/graph/executor/algo/ProduceAllPathsExecutor.cpp index 6adfdf83b9b..15c0e1a473d 100644 --- a/src/graph/executor/algo/ProduceAllPathsExecutor.cpp +++ b/src/graph/executor/algo/ProduceAllPathsExecutor.cpp @@ -3,7 +3,7 @@ // This source code is licensed under Apache 2.0 License. #include "graph/executor/algo/ProduceAllPathsExecutor.h" #include "graph/planner/plan/Algo.h" - +DECLARE_int32(num_operator_threads); namespace nebula { namespace graph { folly::Future ProduceAllPathsExecutor::execute() { @@ -126,8 +126,7 @@ DataSet ProduceAllPathsExecutor::doConjunct(Interims::iterator startIter, } folly::Future ProduceAllPathsExecutor::conjunctPath() { - static size_t NUM_PROC = 5; - auto batchSize = leftPaths_.size() / NUM_PROC; + auto batchSize = leftPaths_.size() / static_cast(FLAGS_num_operator_threads); std::vector> futures; size_t i = 0; diff --git a/src/graph/service/GraphFlags.cpp b/src/graph/service/GraphFlags.cpp index 6532a480700..e4a3a71ea36 100644 --- a/src/graph/service/GraphFlags.cpp +++ b/src/graph/service/GraphFlags.cpp @@ -18,6 +18,7 @@ DEFINE_int32(num_netio_threads, "The number of networking threads, 0 for number of physical CPU cores"); DEFINE_int32(num_accept_threads, 1, "Number of threads to accept incoming connections"); DEFINE_int32(num_worker_threads, 0, "Number of threads to execute user queries"); +DEFINE_int32(num_operator_threads, 5, "Number of threads to execute a single operator"); DEFINE_bool(reuse_port, true, "Whether to turn on the SO_REUSEPORT option"); DEFINE_int32(listen_backlog, 1024, "Backlog of the listen socket"); DEFINE_string(listen_netdev, "any", "The network device to listen on"); diff --git a/src/graph/service/GraphFlags.h b/src/graph/service/GraphFlags.h index 340a3f7e009..cddd8c02e83 100644 --- a/src/graph/service/GraphFlags.h +++ b/src/graph/service/GraphFlags.h @@ -15,6 +15,7 @@ DECLARE_int32(session_reclaim_interval_secs); DECLARE_int32(num_netio_threads); DECLARE_int32(num_accept_threads); DECLARE_int32(num_worker_threads); +DECLARE_int32(num_operator_threads); DECLARE_bool(reuse_port); DECLARE_int32(listen_backlog); DECLARE_string(listen_netdev);