Skip to content

Commit

Permalink
co/MultiValue: new class
Browse files Browse the repository at this point in the history
  • Loading branch information
MaxKellermann committed Dec 6, 2024
1 parent 20309f6 commit b43ba73
Show file tree
Hide file tree
Showing 3 changed files with 366 additions and 0 deletions.
118 changes: 118 additions & 0 deletions src/co/MultiValue.hxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// SPDX-License-Identifier: BSD-2-Clause
// Copyright CM4all GmbH
// author: Max Kellermann <[email protected]>

#pragma once

#include "Compat.hxx"
#include "util/IntrusiveList.hxx"

#include <cassert>
#include <optional>

namespace Co {

/**
* An awaitable that can be awaited on by multiple waiters. As soon
* as a value is set, it becomes ready and resumes all waiters.
*
* This object must remain valid until all waiters have been resumed
* (or canceled).
*
* This is similar to #MultiValue, but there is a return value and it
* cannot be reused.
*/
template<typename T>
class MultiValue final {
struct Awaitable final : IntrusiveListHook<IntrusiveHookMode::TRACK> {
MultiValue &multi;

std::coroutine_handle<> continuation;

[[nodiscard]]
explicit Awaitable(MultiValue &_multi) noexcept
:multi(_multi) {}

~Awaitable() noexcept {
if (is_linked()) {
assert(continuation);
assert(!continuation.done());

unlink();
}
}

Awaitable(const Awaitable &) = delete;
Awaitable &operator=(const Awaitable &) = delete;

[[nodiscard]]
bool await_ready() const noexcept {
assert(!is_linked());
assert(!continuation);

return multi.value.has_value();
}

void await_suspend(std::coroutine_handle<> _continuation) noexcept {
assert(!is_linked());
assert(!multi.value);
assert(!continuation);
assert(_continuation);
assert(!_continuation.done());

continuation = _continuation;

multi.waiters.push_back(*this);
}

T await_resume() noexcept {
assert(!is_linked());
assert(multi.value);

return *multi.value;
}
};

/**
* A list of suspended waiters.
*/
IntrusiveList<Awaitable> waiters;

std::optional<T> value;

public:
[[nodiscard]]
MultiValue() noexcept = default;

~MultiValue() noexcept {
assert(waiters.empty());
}

/**
* Creates a new awaitable
*/
[[nodiscard]]
auto operator co_await() noexcept {
return Awaitable{*this};
}

template<typename U>
void SetReady(U &&_value) noexcept {
assert(!value);

value.emplace(std::forward<U>(_value));

/* move the request list to the stack because the last
* resume() call may destruct this object */
auto tmp = std::move(waiters);

tmp.clear_and_dispose([](Awaitable *r){
assert(r->continuation);
assert(!r->continuation.done());

r->continuation.resume();
});
}
};

} // namespace Co
247 changes: 247 additions & 0 deletions test/co/TestMultiValue.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
// SPDX-License-Identifier: BSD-2-Clause
// Copyright CM4all GmbH
// author: Max Kellermann <[email protected]>

#include "co/MultiValue.hxx"
#include "co/Task.hxx"

#include <gtest/gtest.h>

template<typename T>
static Co::EagerTask<void>
Waiter(Co::MultiValue<T> &task, std::optional<T> &value_r)
{
assert(!value_r);
value_r = co_await task;
}

TEST(MultiValue, Nothing)
{
[[maybe_unused]]
Co::MultiValue<int> m;
}

TEST(MultiValue, ReadyNone)
{
Co::MultiValue<int> m;
m.SetReady(42);
}

TEST(MultiValue, ReadyEarly)
{
Co::MultiValue<int> m;
m.SetReady(42);

std::optional<int> value;
auto w = Waiter(m, value);

EXPECT_TRUE(value);
EXPECT_EQ(*value, 42);
}

TEST(MultiValue, ReadyLate)
{
Co::MultiValue<int> m;

std::optional<int> value;
auto w = Waiter(m, value);

EXPECT_FALSE(value);

m.SetReady(42);
EXPECT_TRUE(value);
EXPECT_EQ(*value, 42);
}

TEST(MultiValue, ResumeFour)
{
Co::MultiValue<int> m;
std::optional<int> values[4];

auto w0 = Waiter(m, values[0]);
auto w1 = Waiter(m, values[1]);

EXPECT_FALSE(values[0]);
EXPECT_FALSE(values[1]);
EXPECT_FALSE(values[2]);
EXPECT_FALSE(values[3]);

m.SetReady(42);

EXPECT_TRUE(values[0]);
EXPECT_TRUE(values[1]);
EXPECT_FALSE(values[2]);
EXPECT_FALSE(values[3]);

EXPECT_EQ(*values[0], 42);
EXPECT_EQ(*values[1], 42);

auto w2 = Waiter(m, values[2]);
auto w3 = Waiter(m, values[3]);

EXPECT_TRUE(values[0]);
EXPECT_TRUE(values[1]);
EXPECT_TRUE(values[2]);
EXPECT_TRUE(values[3]);

EXPECT_EQ(*values[0], 42);
EXPECT_EQ(*values[1], 42);
EXPECT_EQ(*values[2], 42);
EXPECT_EQ(*values[3], 42);
}

TEST(MultiValue, Cancel)
{
Co::MultiValue<int> m;
std::optional<int> values[6];

auto w0 = Waiter(m, values[0]);
auto w1 = Waiter(m, values[1]);

EXPECT_FALSE(values[0]);
EXPECT_FALSE(values[1]);
EXPECT_FALSE(values[2]);
EXPECT_FALSE(values[3]);
EXPECT_FALSE(values[4]);
EXPECT_FALSE(values[5]);

w0 = {};
w1 = {};

EXPECT_FALSE(values[0]);
EXPECT_FALSE(values[1]);
EXPECT_FALSE(values[2]);
EXPECT_FALSE(values[3]);
EXPECT_FALSE(values[4]);
EXPECT_FALSE(values[5]);

auto w2 = Waiter(m, values[2]);
auto w3 = Waiter(m, values[3]);

EXPECT_FALSE(values[0]);
EXPECT_FALSE(values[1]);
EXPECT_FALSE(values[2]);
EXPECT_FALSE(values[3]);
EXPECT_FALSE(values[4]);
EXPECT_FALSE(values[5]);

w3 = {};

EXPECT_FALSE(values[0]);
EXPECT_FALSE(values[1]);
EXPECT_FALSE(values[2]);
EXPECT_FALSE(values[3]);
EXPECT_FALSE(values[4]);
EXPECT_FALSE(values[5]);

m.SetReady(42);

EXPECT_FALSE(values[0]);
EXPECT_FALSE(values[1]);
EXPECT_TRUE(values[2]);
EXPECT_FALSE(values[3]);
EXPECT_FALSE(values[4]);
EXPECT_FALSE(values[5]);

EXPECT_EQ(*values[2], 42);

auto w4 = Waiter(m, values[4]);
auto w5 = Waiter(m, values[5]);

EXPECT_FALSE(values[0]);
EXPECT_FALSE(values[1]);
EXPECT_TRUE(values[2]);
EXPECT_FALSE(values[3]);
EXPECT_TRUE(values[4]);
EXPECT_TRUE(values[5]);

EXPECT_EQ(*values[2], 42);
EXPECT_EQ(*values[4], 42);
EXPECT_EQ(*values[5], 42);
}

template<typename T>
static Co::EagerTask<void>
CancelOtherTaskWaiter(Co::MultiValue<T> &task, std::optional<T> &value_r,
auto &cancel_task)
{
assert(!value_r);
auto value = co_await task;

cancel_task = {};
assert(!value_r);

value_r.emplace(std::move(value));
}

static Co::EagerTask<void>
CancelOtherTaskWaiter(auto &&task, bool &complete_r, auto &cancel_task)
{
assert(!complete_r);
co_await task;

cancel_task = {};

assert(!complete_r);
complete_r = true;
}

/**
* One resumed task cancels another (suspended) task.
*/
TEST(MultiValue, CancelInTask)
{
Co::MultiValue<int> m;

std::optional<int> values[2];
std::array<Co::EagerTask<void>, 2> waiters;
waiters[0] = CancelOtherTaskWaiter(m, values[0], waiters[1]);
waiters[1] = Waiter(m, values[1]);

EXPECT_FALSE(values[0]);
EXPECT_FALSE(values[1]);

m.SetReady(42);
EXPECT_TRUE(values[0]);
EXPECT_FALSE(values[1]);

EXPECT_EQ(*values[0], 42);
}

template<typename T>
static Co::EagerTask<void>
AwaitOtherTaskWaiter(Co::MultiValue<T> &task, std::optional<T> &value_r,
auto &other_task, std::optional<T> &other_value_r)
{
assert(!value_r);
assert(!other_value_r);

value_r = co_await task;

assert(!other_value_r);

other_task = Waiter(task, other_value_r);
assert(other_value_r);
}

/**
* One resumed task adds another waiter.
*/
TEST(MultiValue, AwaitInTask)
{
Co::MultiValue<int> m;

std::optional<int> values[2];
std::array<Co::EagerTask<void>, 2> waiters;
waiters[0] = AwaitOtherTaskWaiter(m, values[0], waiters[1], values[1]);

EXPECT_FALSE(values[0]);
EXPECT_FALSE(values[1]);

m.SetReady(42);
EXPECT_TRUE(values[0]);
EXPECT_TRUE(values[1]);

EXPECT_EQ(*values[0], 42);
EXPECT_EQ(*values[1], 42);
}
1 change: 1 addition & 0 deletions test/co/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ test(
'TestCoCache.cxx',
'TestMultiAwaitable.cxx',
'TestMultiResume.cxx',
'TestMultiValue.cxx',
include_directories: inc,
dependencies: [
gtest,
Expand Down

0 comments on commit b43ba73

Please sign in to comment.