Skip to content

Commit

Permalink
Segment auto-increasing id (vesoft-inc#721)
Browse files Browse the repository at this point in the history
#### What type of PR is this?
- [ ] bug
- [x] feature
- [ ] enhancement

#### What does this PR do?


#### Which issue(s)/PR(s) this PR relates to?

close vesoft-inc#3448
  
#### Special notes for your reviewer, ex. impact of this fix, etc:

#### Additional context:


#### Checklist:
- [x] Documentation affected (Please add the label if documentation needs to be modified.)
- [ ] Incompatible (If it is incompatible, please describe it and add corresponding label.)
- [ ] Need to cherry-pick (If need to cherry-pick to some branches, please label the destination version(s).)
- [x] Performance impacted: Consumes more CPU/Memory


#### Release notes:
Please confirm whether to reflect in release notes and how to describe:
> Support segment auto-increasing id                                                     `


Migrated from vesoft-inc#3550

Co-authored-by: jakevin <[email protected]>
  • Loading branch information
nebula-bots and jackwener authored Mar 22, 2022
1 parent 7588f1e commit 4355331
Show file tree
Hide file tree
Showing 23 changed files with 572 additions and 18 deletions.
8 changes: 6 additions & 2 deletions .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ jobs:
- name: CTest
env:
ASAN_OPTIONS: fast_unwind_on_malloc=1
run: ctest -j $(($(nproc)/2+1)) --timeout 400 --output-on-failure
run: |
ctest -j $(($(nproc)/2+1)) --timeout 400 --output-on-failure -LE segment_id_test
ctest -j $(($(nproc)/2+1)) --timeout 400 --output-on-failure -L segment_id_test
working-directory: build/
timeout-minutes: 20
- name: Setup cluster
Expand Down Expand Up @@ -273,7 +275,9 @@ jobs:
- name: CTest
env:
ASAN_OPTIONS: fast_unwind_on_malloc=1
run: ctest -j $(($(nproc)/2+1)) --timeout 400 --output-on-failure
run: |
ctest -j $(($(nproc)/2+1)) --timeout 400 --output-on-failure -LE segment_id_test
ctest -j $(($(nproc)/2+1)) --timeout 400 --output-on-failure -L segment_id_test
working-directory: build/
timeout-minutes: 20
- name: Setup Cluster
Expand Down
15 changes: 15 additions & 0 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4037,6 +4037,21 @@ folly::Future<StatusOr<bool>> MetaClient::ingest(GraphSpaceID spaceId) {
return folly::async(func);
}

folly::Future<StatusOr<int64_t>> MetaClient::getSegmentId(int64_t length) {
auto req = cpp2::GetSegmentIdReq();
req.length_ref() = length;

folly::Promise<StatusOr<int64_t>> promise;
auto future = promise.getFuture();
getResponse(
std::move(req),
[](auto client, auto request) { return client->future_getSegmentId(request); },
[](cpp2::GetSegmentIdResp&& resp) -> int64_t { return std::move(resp).get_segment_id(); },
std::move(promise),
true);
return future;
}

bool MetaClient::loadSessions() {
auto session_list = listSessions().get();
if (!session_list.ok()) {
Expand Down
13 changes: 11 additions & 2 deletions src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,14 @@ struct MetaClientOptions {
std::string rootPath_;
};

class MetaClient {
class BaseMetaClient {
public:
virtual folly::Future<StatusOr<int64_t>> getSegmentId(int64_t length) = 0;

virtual ~BaseMetaClient() = default;
};

class MetaClient : public BaseMetaClient {
FRIEND_TEST(ConfigManTest, MetaConfigManTest);
FRIEND_TEST(ConfigManTest, MockConfigTest);
FRIEND_TEST(ConfigManTest, RocksdbOptionsTest);
Expand All @@ -266,7 +273,7 @@ class MetaClient {
std::vector<HostAddr> addrs,
const MetaClientOptions& options = MetaClientOptions());

virtual ~MetaClient();
~MetaClient() override;

bool isMetadReady();

Expand Down Expand Up @@ -726,6 +733,8 @@ class MetaClient {

folly::Future<StatusOr<int64_t>> getWorkerId(std::string ipAddr);

folly::Future<StatusOr<int64_t>> getSegmentId(int64_t length) override;

HostAddr getMetaLeader() {
return leader_;
}
Expand Down
5 changes: 5 additions & 0 deletions src/common/id/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,9 @@ nebula_add_library(
UUID.cpp
)

nebula_add_library(
segment_id_obj OBJECT
SegmentId.cpp
)

nebula_add_subdirectory(test)
70 changes: 70 additions & 0 deletions src/common/id/SegmentId.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#include "common/id/SegmentId.h"

namespace nebula {

StatusOr<int64_t> SegmentId::getId() {
std::lock_guard<std::mutex> guard(mutex_);

if (cur_ < segmentStart_ + step_ - 1) {
// non-block prefetch next segment
if (cur_ == segmentStart_ + (step_ / 2) - 1) {
asyncFetchSegment();
}
cur_ += 1;
} else { // cur == segment end
if (segmentStart_ >= nextSegmentStart_) {
// indicate asyncFetchSegment() failed or fetchSegment() slow
LOG(ERROR)
<< "segmentId asyncFetchSegment() failed or slow(step is too small), segmentStart_: "
<< segmentStart_ << ", nextSegmentStart_: " << nextSegmentStart_;
auto xRet = fetchSegment();
NG_RETURN_IF_ERROR(xRet);
nextSegmentStart_ = xRet.value();
}
segmentStart_ = nextSegmentStart_;
cur_ = segmentStart_;
}

return cur_;
}

void SegmentId::asyncFetchSegment() {
auto future = client_->getSegmentId(step_);
std::move(future).via(runner_).thenValue([this](StatusOr<int64_t> resp) {
NG_RETURN_IF_ERROR(resp);
if (!resp.value()) {
return Status::Error("asyncFetchSegment failed!");
}
this->nextSegmentStart_ = resp.value();
return Status::OK();
});
}

StatusOr<int64_t> SegmentId::fetchSegment() {
auto result = client_->getSegmentId(step_).get();

NG_RETURN_IF_ERROR(result);
return result.value();
}

Status SegmentId::init(int64_t step) {
step_ = step;
if (step < kMinStep_) {
return Status::Error("Step is too small");
}

auto xRet = fetchSegment();
NG_RETURN_IF_ERROR(xRet);

segmentStart_ = xRet.value();
cur_ = segmentStart_ - 1;

return Status::OK();
}

} // namespace nebula
52 changes: 52 additions & 0 deletions src/common/id/SegmentId.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#ifndef COMMON_ID_SEGMENTINCR_H_
#define COMMON_ID_SEGMENTINCR_H_

#include "clients/meta/MetaClient.h"

namespace nebula {
// Segment auto-increase id
class SegmentId {
public:
SegmentId(meta::BaseMetaClient* client, folly::Executor* runner)
: client_(client), runner_(runner) {}

~SegmentId() = default;

SegmentId(const SegmentId&) = delete;

SegmentId& operator=(const SegmentId&) = delete;

Status init(int64_t step);

StatusOr<int64_t> getId();

private:
// when get id fast or fetchSegment() slow or fail, getSegmentId() directly.
// In this case, the new segment will overlap with the old one.
void asyncFetchSegment();

StatusOr<int64_t> fetchSegment();

std::mutex mutex_;

int64_t cur_{-1};
int64_t step_{-1};

int64_t segmentStart_{-2};
int64_t nextSegmentStart_{-2};

// ensure the segment can be use for 10 mins.
// 2 segment = max insert/secs * 600. segment = 400000 * 600 / 2 = 120000000
static inline constexpr int64_t kMinStep_{120000000};

meta::BaseMetaClient* client_;
folly::Executor* runner_;
};
} // namespace nebula

#endif // COMMON_ID_SEGMENTINCR_H_
27 changes: 27 additions & 0 deletions src/common/id/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,30 @@ nebula_add_test(
${PROXYGEN_LIBRARIES}
${THRIFT_LIBRARIES}
)

nebula_add_test(
NAME segment_id_test
SOURCES SegmentIdTest.cpp
OBJECTS
$<TARGET_OBJECTS:base_obj>
$<TARGET_OBJECTS:segment_id_obj>
$<TARGET_OBJECTS:thrift_obj>
LIBRARIES
gtest
gtest_main
${THRIFT_LIBRARIES}
)

nebula_add_test(
NAME segment_id_bm
SOURCES SegmentIdBenchmark.cpp
OBJECTS

$<TARGET_OBJECTS:base_obj>
$<TARGET_OBJECTS:segment_id_obj>
$<TARGET_OBJECTS:thrift_obj>
LIBRARIES
follybenchmark
boost_regex
${THRIFT_LIBRARIES}
)
22 changes: 22 additions & 0 deletions src/common/id/test/MockMetaClient.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#include "common/id/SegmentId.h"

namespace nebula {
class MockMetaClient : public meta::BaseMetaClient {
public:
folly::Future<StatusOr<int64_t>> getSegmentId(int64_t length) override {
std::lock_guard<std::mutex> guard(mutex_);
auto future = folly::makeFuture(cur_.load());
cur_.fetch_add(length);
return future;
}

private:
std::mutex mutex_;
std::atomic_int64_t cur_{0};
};
} // namespace nebula
86 changes: 86 additions & 0 deletions src/common/id/test/SegmentIdBenchmark.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#include <folly/Benchmark.h>

#include "MockMetaClient.h"

class MockMetaClient : public nebula::meta::BaseMetaClient {
public:
folly::Future<nebula::StatusOr<int64_t>> getSegmentId(int64_t length) override {
std::lock_guard<std::mutex> guard(mutex_);
auto future = folly::makeFuture(cur_.load());
cur_.fetch_add(length);
return future;
}

private:
std::mutex mutex_;
std::atomic_int64_t cur_{0};
};

size_t SegmentIdCurrencyTest(size_t iters, int threadNum) {
constexpr size_t ops = 1000000UL;
int step = 120000000;

MockMetaClient metaClient = MockMetaClient();
std::shared_ptr<apache::thrift::concurrency::ThreadManager> threadManager(
PriorityThreadManager::newPriorityThreadManager(32));
threadManager->setNamePrefix("executor");
threadManager->start();

nebula::SegmentId generator = nebula::SegmentId(&metaClient, threadManager.get());
nebula::Status status = generator.init(step);
ASSERT(status.ok());

auto proc = [&]() {
auto n = iters * ops;
for (auto i = 0UL; i < n; i++) {
nebula::StatusOr<int64_t> id = generator.getId();
folly::doNotOptimizeAway(id);
}
};

std::vector<std::thread> threads;
threads.reserve(threadNum);

for (int i = 0; i < threadNum; i++) {
threads.emplace_back(std::thread(proc));
}

for (int i = 0; i < threadNum; i++) {
threads[i].join();
}

return iters * ops * threadNum;
}

BENCHMARK_NAMED_PARAM_MULTI(SegmentIdCurrencyTest, 1_thread, 1)
BENCHMARK_RELATIVE_NAMED_PARAM_MULTI(SegmentIdCurrencyTest, 2_thread, 2)
BENCHMARK_RELATIVE_NAMED_PARAM_MULTI(SegmentIdCurrencyTest, 4_thread, 4)
BENCHMARK_RELATIVE_NAMED_PARAM_MULTI(SegmentIdCurrencyTest, 8_thread, 8)
BENCHMARK_RELATIVE_NAMED_PARAM_MULTI(SegmentIdCurrencyTest, 16_thread, 16)
BENCHMARK_RELATIVE_NAMED_PARAM_MULTI(SegmentIdCurrencyTest, 32_thread, 32)
BENCHMARK_RELATIVE_NAMED_PARAM_MULTI(SegmentIdCurrencyTest, 64_thread, 64)

int main(int argc, char** argv) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
folly::runBenchmarks();
return 0;
}

/* Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
============================================================================
nebula/src/common/id/test/SegmentIdBenchmark.cpprelative time/iter iters/s
============================================================================
SegmentIdCurrencyTest(1_thread) 64.23ns 15.57M
SegmentIdCurrencyTest(2_thread) 55.35% 116.03ns 8.62M
SegmentIdCurrencyTest(4_thread) 49.64% 129.39ns 7.73M
SegmentIdCurrencyTest(8_thread) 37.61% 170.76ns 5.86M
SegmentIdCurrencyTest(16_thread) 34.91% 183.98ns 5.44M
SegmentIdCurrencyTest(32_thread) 27.57% 232.98ns 4.29M
SegmentIdCurrencyTest(64_thread) 21.77% 295.00ns 3.39M
============================================================================
*/
Loading

0 comments on commit 4355331

Please sign in to comment.