Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add non-blocking functions for MPMCQueue #5311

Merged
merged 4 commits into from
Jul 7, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 53 additions & 22 deletions dbms/src/Common/MPMCQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,56 +74,80 @@ class MPMCQueue
destruct(getObj(read_pos));
}

/// Block util:
/// Block until:
/// 1. Pop succeeds with a valid T: return true.
/// 2. The queue is cancelled or finished: return false.
bool pop(T & obj)
ALWAYS_INLINE bool pop(T & obj)
{
return popObj(obj);
return popObj<true>(obj);
}

/// Besides all conditions mentioned at `pop`, `tryPop` will return false if `timeout` is exceeded.
/// Besides all conditions mentioned at `pop`, `popTimeout` will return false if `timeout` is exceeded.
template <typename Duration>
bool tryPop(T & obj, const Duration & timeout)
ALWAYS_INLINE bool popTimeout(T & obj, const Duration & timeout)
{
/// std::condition_variable::wait_until will always use system_clock.
auto deadline = std::chrono::system_clock::now() + timeout;
return popObj(obj, &deadline);
return popObj<true>(obj, &deadline);
}

/// Block util:
/// Non-blocking function.
/// Return true if pop succeed.
/// else return false.
ALWAYS_INLINE bool tryPop(T & obj)
{
return popObj<false>(obj);
}

/// Block until:
/// 1. Push succeeds and return true.
/// 2. The queue is cancelled and return false.
/// 3. The queue has finished and return false.
template <typename U>
ALWAYS_INLINE bool push(U && u)
{
return pushObj(std::forward<U>(u));
return pushObj<true>(std::forward<U>(u));
}

/// Besides all conditions mentioned at `push`, `tryPush` will return false if `timeout` is exceeded.
/// Besides all conditions mentioned at `push`, `pushTimeout` will return false if `timeout` is exceeded.
template <typename U, typename Duration>
ALWAYS_INLINE bool tryPush(U && u, const Duration & timeout)
ALWAYS_INLINE bool pushTimeout(U && u, const Duration & timeout)
{
/// std::condition_variable::wait_until will always use system_clock.
auto deadline = std::chrono::system_clock::now() + timeout;
return pushObj(std::forward<U>(u), &deadline);
return pushObj<true>(std::forward<U>(u), &deadline);
}

/// Non-blocking function.
/// Return true if push succeed.
/// else return false.
template <typename U>
ALWAYS_INLINE bool tryPush(U && u)
{
return pushObj<false>(std::forward<U>(u));
}

/// The same as `push` except it will construct the object in place.
template <typename... Args>
ALWAYS_INLINE bool emplace(Args &&... args)
{
return emplaceObj(nullptr, std::forward<Args>(args)...);
return emplaceObj<true>(nullptr, std::forward<Args>(args)...);
}

/// The same as `tryPush` except it will construct the object in place.
/// The same as `pushTimeout` except it will construct the object in place.
template <typename... Args, typename Duration>
ALWAYS_INLINE bool tryEmplace(Args &&... args, const Duration & timeout)
ALWAYS_INLINE bool emplaceTimeout(Args &&... args, const Duration & timeout)
{
/// std::condition_variable::wait_until will always use system_clock.
auto deadline = std::chrono::system_clock::now() + timeout;
return emplaceObj(&deadline, std::forward<Args>(args)...);
return emplaceObj<true>(&deadline, std::forward<Args>(args)...);
}

/// The same as `tryPush` except it will construct the object in place.
template <typename... Args>
ALWAYS_INLINE bool tryEmplace(Args &&... args)
{
return emplaceObj<false>(nullptr, std::forward<Args>(args)...);
}

/// Cancel a NORMAL queue will wake up all blocking readers and writers.
Expand Down Expand Up @@ -233,6 +257,7 @@ class MPMCQueue
}
}

template <bool need_wait>
bool popObj(T & res, const TimePoint * deadline = nullptr)
gengliqi marked this conversation as resolved.
Show resolved Hide resolved
{
#ifdef __APPLE__
Expand All @@ -248,7 +273,10 @@ class MPMCQueue

std::unique_lock lock(mu);

wait(lock, reader_head, node, pred, deadline);
if constexpr (need_wait)
gengliqi marked this conversation as resolved.
Show resolved Hide resolved
{
gengliqi marked this conversation as resolved.
Show resolved Hide resolved
wait(lock, reader_head, node, pred, deadline);
}

if (!isCancelled() && read_pos < write_pos)
{
Expand All @@ -272,7 +300,7 @@ class MPMCQueue
return false;
}

template <typename F>
template <bool need_wait, typename F>
bool assignObj(const TimePoint * deadline, F && assigner)
gengliqi marked this conversation as resolved.
Show resolved Hide resolved
{
#ifdef __APPLE__
Expand All @@ -286,7 +314,10 @@ class MPMCQueue

std::unique_lock lock(mu);

wait(lock, writer_head, node, pred, deadline);
if constexpr (need_wait)
{
wait(lock, writer_head, node, pred, deadline);
}

gengliqi marked this conversation as resolved.
Show resolved Hide resolved
/// double check status after potential wait
/// check write_pos because timeouted will also reach here.
Expand All @@ -305,16 +336,16 @@ class MPMCQueue
return false;
}

template <typename U>
template <bool need_wait, typename U>
ALWAYS_INLINE bool pushObj(U && u, const TimePoint * deadline = nullptr)
{
return assignObj(deadline, [&](void * addr) { new (addr) T(std::forward<U>(u)); });
return assignObj<need_wait>(deadline, [&](void * addr) { new (addr) T(std::forward<U>(u)); });
}

template <typename... Args>
template <bool need_wait, typename... Args>
ALWAYS_INLINE bool emplaceObj(const TimePoint * deadline, Args &&... args)
{
return assignObj(deadline, [&](void * addr) { new (addr) T(std::forward<Args>(args)...); });
return assignObj<need_wait>(deadline, [&](void * addr) { new (addr) T(std::forward<Args>(args)...); });
}

ALWAYS_INLINE bool isNormal() const
Expand Down
25 changes: 14 additions & 11 deletions dbms/src/Common/tests/gtest_mpmc_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,14 @@ class MPMCQueueTest : public ::testing::Test
void testCannotTryPush(MPMCQueue<T> & queue)
{
auto old_size = queue.size();
auto res = queue.tryPush(ValueHelper<T>::make(-1), std::chrono::microseconds(1));
auto new_size = queue.size();
if (res)
bool ok1 = queue.tryPush(ValueHelper<T>::make(-1));
auto new_size1 = queue.size();
bool ok2 = queue.pushTimeout(ValueHelper<T>::make(-1), std::chrono::microseconds(1));
auto new_size2 = queue.size();
if (ok1 || ok2)
throw TiFlashTestException("Should push fail");
if (old_size != new_size)
throw TiFlashTestException(fmt::format("Size changed from {} to {} without push", old_size, new_size));
if (old_size != new_size1 || old_size != new_size2)
throw TiFlashTestException(fmt::format("Size changed from {} to {} and {} without push", old_size, new_size1, new_size2));
}

template <typename T>
Expand All @@ -124,12 +126,14 @@ class MPMCQueueTest : public ::testing::Test
{
auto old_size = queue.size();
T res;
bool ok = queue.tryPop(res, std::chrono::microseconds(1));
auto new_size = queue.size();
if (ok)
bool ok1 = queue.tryPop(res);
auto new_size1 = queue.size();
bool ok2 = queue.popTimeout(res, std::chrono::microseconds(1));
auto new_size2 = queue.size();
if (ok1 || ok2)
throw TiFlashTestException("Should pop fail");
if (old_size != new_size)
throw TiFlashTestException(fmt::format("Size changed from {} to {} without pop", old_size, new_size));
if (old_size != new_size1 || old_size != new_size2)
throw TiFlashTestException(fmt::format("Size changed from {} to {} and {} without pop", old_size, new_size1, new_size2));
}

template <typename T>
Expand Down Expand Up @@ -474,7 +478,6 @@ class MPMCQueueTest : public ::testing::Test
throwOrMove(std::move(rhs));
}


ThrowInjectable & operator=(ThrowInjectable && rhs)
{
if (this != &rhs)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Common/tests/mpmc_queue_perftest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ struct Helper<MPMCQueue<T>>
template <typename U>
static void pushOneTo(MPMCQueue<T> & queue, U && data)
{
queue.tryPush(std::forward<U>(data), std::chrono::milliseconds(1));
queue.pushTimeout(std::forward<U>(data), std::chrono::milliseconds(1));
}
};

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/ExchangeReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ void ExchangeReceiverBase<RPCContext>::reactor(const std::vector<Request> & asyn
for (Int32 i = 0; i < check_waiting_requests_freq; ++i)
{
AsyncHandler * handler = nullptr;
if (unlikely(!ready_requests.tryPop(handler, timeout)))
if (unlikely(!ready_requests.popTimeout(handler, timeout)))
break;

handler->handle();
Expand Down