Skip to content

Commit

Permalink
pw_async2: Use Waker from Context in OnceSender
Browse files Browse the repository at this point in the history
Change-Id: I7afb81d88066c252d4260c923917c29b0a2a7ad8
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/237512
Reviewed-by: Wyatt Hepler <[email protected]>
Lint: Lint 🤖 <[email protected]>
Docs-Not-Needed: Taylor Cramer <[email protected]>
Presubmit-Verified: CQ Bot Account <[email protected]>
Pigweed-Auto-Submit: Taylor Cramer <[email protected]>
Commit-Queue: Auto-Submit <[email protected]>
  • Loading branch information
cramertj authored and CQ Bot Account committed Sep 24, 2024
1 parent d95038e commit 49c68b8
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 79 deletions.
30 changes: 11 additions & 19 deletions pw_async2/once_sender_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,24 +49,21 @@ class ValueTask : public Task {
public:
ValueTask(bool use_make_constructor = true)
: use_make_constructor_(use_make_constructor) {}
Poll<> DoPend(Context& ctx) override {
Poll<> DoPend(Context& cx) override {
if (!receiver_) {
if (use_make_constructor_) {
auto [send, recv] =
pw::async2::MakeOnceSenderAndReceiver<MoveOnlyValue>(
ctx.GetWaker(pw::async2::WaitReason::Unspecified()));
pw::async2::MakeOnceSenderAndReceiver<MoveOnlyValue>();
sender_.emplace(std::move(send));
receiver_.emplace(std::move(recv));
} else {
receiver_.emplace();
sender_.emplace();
pw::async2::InitializeOnceSenderAndReceiver<MoveOnlyValue>(
sender_.value(),
receiver_.value(),
ctx.GetWaker(pw::async2::WaitReason::Unspecified()));
sender_.value(), receiver_.value());
}
}
Poll<pw::Result<MoveOnlyValue>> poll = receiver_.value().Pend();
Poll<pw::Result<MoveOnlyValue>> poll = receiver_.value().Pend(cx);
if (poll.IsReady()) {
ready_value_.emplace(std::move(poll.value()));
return Ready();
Expand Down Expand Up @@ -163,25 +160,21 @@ class VectorTask : public Task {
VectorTask(bool use_make_constructor = true)
: use_make_constructor_(use_make_constructor) {}

Poll<> DoPend(Context& ctx) override {
Poll<> DoPend(Context& cx) override {
if (!receiver_) {
if (use_make_constructor_) {
auto [send, recv] =
pw::async2::MakeOnceRefSenderAndReceiver<pw::Vector<int>>(
value_, ctx.GetWaker(pw::async2::WaitReason::Unspecified()));
pw::async2::MakeOnceRefSenderAndReceiver<pw::Vector<int>>(value_);
sender_.emplace(std::move(send));
receiver_.emplace(std::move(recv));
} else {
sender_.emplace();
receiver_.emplace();
pw::async2::InitializeOnceRefSenderAndReceiver<pw::Vector<int>>(
sender_.value(),
receiver_.value(),
value_,
ctx.GetWaker(pw::async2::WaitReason::Unspecified()));
sender_.value(), receiver_.value(), value_);
}
}
Poll<pw::Status> poll = receiver_->Pend();
Poll<pw::Status> poll = receiver_->Pend(cx);
if (poll.IsReady()) {
ready_value_.emplace(poll.value());
return Ready();
Expand Down Expand Up @@ -264,15 +257,14 @@ TEST(OnceSender, DestroyingOnceRefSenderCausesReceiverPendToReturnCancelled) {

class MoveOnlyRefTask : public Task {
public:
Poll<> DoPend(Context& ctx) override {
Poll<> DoPend(Context& cx) override {
if (!receiver_) {
auto [send, recv] =
pw::async2::MakeOnceRefSenderAndReceiver<MoveOnlyValue>(
value_, ctx.GetWaker(pw::async2::WaitReason::Unspecified()));
pw::async2::MakeOnceRefSenderAndReceiver<MoveOnlyValue>(value_);
sender_.emplace(std::move(send));
receiver_.emplace(std::move(recv));
}
Poll<pw::Status> poll = receiver_->Pend();
Poll<pw::Status> poll = receiver_->Pend(cx);
if (poll.IsReady()) {
ready_value_.emplace(poll.value());
return Ready();
Expand Down
102 changes: 42 additions & 60 deletions pw_async2/public/pw_async2/once_sender.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#pragma once

#include "pw_async2/dispatcher.h"
#include "pw_async2/dispatcher_base.h"
#include "pw_function/function.h"
#include "pw_sync/mutex.h"

Expand Down Expand Up @@ -64,31 +65,27 @@ class OnceReceiver final {
/// Returns `Ready` with a result containing the value once the value has been
/// assigned. If the sender is destroyed before sending a value, a `Cancelled`
/// result will be returned.
Poll<Result<T>> Pend() {
Poll<Result<T>> Pend(Context& cx) {
std::lock_guard lock(sender_receiver_lock());
if (value_.has_value()) {
return Ready(std::move(*value_));
} else if (!sender_) {
return Ready(Status::Cancelled());
}
waker_ = cx.GetWaker(WaitReason::Unspecified());
return Pending();
}

private:
template <typename U>
friend std::pair<OnceSender<U>, OnceReceiver<U>> MakeOnceSenderAndReceiver(
Waker);
friend std::pair<OnceSender<U>, OnceReceiver<U>> MakeOnceSenderAndReceiver();
template <typename U>
friend void InitializeOnceSenderAndReceiver(OnceSender<U>& sender,
OnceReceiver<U>& receiver,
Waker waker);
OnceReceiver<U>& receiver);
friend class OnceSender<T>;

// `waker` is the `Waker` to be awoken when the value is assigned.
OnceReceiver(Waker waker) : waker_(std::move(waker)) {}

OnceSender<T>* sender_ PW_GUARDED_BY(sender_receiver_lock()) = nullptr;
std::optional<T> value_ PW_GUARDED_BY(sender_receiver_lock());
std::optional<T> value_ PW_GUARDED_BY(sender_receiver_lock()) = std::nullopt;
Waker waker_;
};

Expand Down Expand Up @@ -145,12 +142,10 @@ class OnceSender final {

private:
template <typename U>
friend std::pair<OnceSender<U>, OnceReceiver<U>> MakeOnceSenderAndReceiver(
Waker);
friend std::pair<OnceSender<U>, OnceReceiver<U>> MakeOnceSenderAndReceiver();
template <typename U>
friend void InitializeOnceSenderAndReceiver(OnceSender<U>& sender,
OnceReceiver<U>& receiver,
Waker waker);
OnceReceiver<U>& receiver);
friend class OnceReceiver<T>;

OnceSender(OnceReceiver<T>* receiver) : receiver_(receiver) {}
Expand All @@ -167,28 +162,21 @@ OnceReceiver<T>::~OnceReceiver() {
}

/// Construct a pair of `OnceSender` and `OnceReceiver`.
/// @param waker The `Waker` to be awoken when the value is sent.
template <typename T>
std::pair<OnceSender<T>, OnceReceiver<T>> MakeOnceSenderAndReceiver(
Waker waker) {
OnceReceiver<T> receiver(std::move(waker));
OnceSender<T> sender(&receiver);
{
std::lock_guard lock(sender_receiver_lock());
receiver.sender_ = &sender;
}
return std::make_pair(std::move(sender), std::move(receiver));
std::pair<OnceSender<T>, OnceReceiver<T>> MakeOnceSenderAndReceiver() {
std::pair<OnceSender<T>, OnceReceiver<T>> send_recv;
InitializeOnceSenderAndReceiver(send_recv.first, send_recv.second);
return send_recv;
}

/// Initialize a pair of `OnceSender` and `OnceReceiver`.
/// @param waker The `Waker` to be awoken when the value is sent.
template <typename T>
void InitializeOnceSenderAndReceiver(OnceSender<T>& sender,
OnceReceiver<T>& receiver,
Waker waker) {
std::lock_guard lock(sender_receiver_lock());
OnceReceiver<T>& receiver)
PW_NO_LOCK_SAFETY_ANALYSIS {
// Disable lock analysis because these are fresh sender/receiver pairs and
// do not require a lock to initialize;
receiver.sender_ = &sender;
receiver.waker_ = std::move(waker);
sender.receiver_ = &receiver;
}

Expand All @@ -215,7 +203,6 @@ class OnceRefReceiver final {
sender_->receiver_ = this;
}
value_ = other.value_;
cancelled_ = other.cancelled_;
waker_ = std::move(other.waker_);
}

Expand All @@ -228,35 +215,35 @@ class OnceRefReceiver final {
/// Returns `Ready` with an `ok` status when the modification of the
/// reference is complete. If the sender is destroyed before updating the
/// reference, a `cancelled` status is returned.
Poll<Status> Pend() {
Poll<Status> Pend(Context& cx) {
std::lock_guard lock(sender_receiver_lock());
if (cancelled_) {
return Ready(Status::Cancelled());
}
if (waker_.IsEmpty()) {
if (value_ == nullptr) {
return Ready(OkStatus());
}
if (sender_ == nullptr) {
return Ready(Status::Cancelled());
}
waker_ = cx.GetWaker(WaitReason::Unspecified());
return Pending();
}

private:
template <typename U>
friend std::pair<OnceRefSender<U>, OnceRefReceiver<U>>
MakeOnceRefSenderAndReceiver(U&, Waker);
MakeOnceRefSenderAndReceiver(U&);
template <typename U>
friend void InitializeOnceRefSenderAndReceiver(OnceRefSender<U>& sender,
OnceRefReceiver<U>& receiver,
U& value,
Waker waker);
U& value);
friend class OnceRefSender<T>;

// `waker` is the `Waker` to be awoken when the value is assigned.
OnceRefReceiver(T& value, Waker waker)
: value_(&value), waker_(std::move(waker)) {}
OnceRefReceiver(T& value) : value_(&value) {}

OnceRefSender<T>* sender_ PW_GUARDED_BY(sender_receiver_lock()) = nullptr;
// Pointer to the value to be modified. Set to `nullptr` once modification
// is complete.
T* value_ PW_GUARDED_BY(sender_receiver_lock()) = nullptr;
bool cancelled_ PW_GUARDED_BY(sender_receiver_lock()) = false;
// Pointer to the modifier. Set to `nullptr` if the sender disappears.
OnceRefSender<T>* sender_ PW_GUARDED_BY(sender_receiver_lock()) = nullptr;
Waker waker_;
};

Expand All @@ -273,10 +260,7 @@ class OnceRefSender final {
std::lock_guard lock(sender_receiver_lock());
if (receiver_) {
receiver_->sender_ = nullptr;
if (!receiver_->waker_.IsEmpty()) {
receiver_->cancelled_ = true;
std::move(receiver_->waker_).Wake();
}
std::move(receiver_->waker_).Wake();
}
}

Expand All @@ -300,6 +284,7 @@ class OnceRefSender final {
*(receiver_->value_) = value;
std::move(receiver_->waker_).Wake();
receiver_->sender_ = nullptr;
receiver_->value_ = nullptr;
receiver_ = nullptr;
}
}
Expand All @@ -311,6 +296,7 @@ class OnceRefSender final {
*(receiver_->value_) = std::move(value);
std::move(receiver_->waker_).Wake();
receiver_->sender_ = nullptr;
receiver_->value_ = nullptr;
receiver_ = nullptr;
}
}
Expand All @@ -334,19 +320,19 @@ class OnceRefSender final {
if (receiver_) {
std::move(receiver_->waker_).Wake();
receiver_->sender_ = nullptr;
receiver_->value_ = nullptr;
receiver_ = nullptr;
}
}

private:
template <typename U>
friend std::pair<OnceRefSender<U>, OnceRefReceiver<U>>
MakeOnceRefSenderAndReceiver(U&, Waker);
MakeOnceRefSenderAndReceiver(U&);
template <typename U>
friend void InitializeOnceRefSenderAndReceiver(OnceRefSender<U>& sender,
OnceRefReceiver<U>& receiver,
U& value,
Waker waker);
U& value);
friend class OnceRefReceiver<T>;

OnceRefSender(OnceRefReceiver<T>* receiver) : receiver_(receiver) {}
Expand All @@ -366,30 +352,26 @@ OnceRefReceiver<T>::~OnceRefReceiver() {
/// @param[in] value The reference to be mutated by the sender. It must mot be
/// read or modified until either `OnceRefSender` indicates `Ready()` or
/// either the `OnceRefSender` or `OnceRefReceiver` is destroyed.
/// @param[in] waker The `Waker` to be awoken when the reference is updated.
template <typename T>
std::pair<OnceRefSender<T>, OnceRefReceiver<T>> MakeOnceRefSenderAndReceiver(
T& value, Waker waker) {
OnceRefReceiver<T> receiver(value, std::move(waker));
OnceRefSender<T> sender(&receiver);
return std::make_pair(std::move(sender), std::move(receiver));
T& value) {
std::pair<OnceRefSender<T>, OnceRefReceiver<T>> send_recv;
InitializeOnceRefSenderAndReceiver(send_recv.first, send_recv.second, value);
return send_recv;
}

/// Initialize a pair of `OnceRefSender` and `OnceRefReceiver`.
/// @param[in] value The reference to be mutated by the sender. It must mot be
/// read or modified until either `OnceRefSender` indicates `Ready()` or
/// either the `OnceRefSender` or `OnceRefReceiver` is destroyed.
/// @param[in] waker The `Waker` to be awoken when the reference is updated.
template <typename T>
void InitializeOnceRefSenderAndReceiver(OnceRefSender<T>& sender,
OnceRefReceiver<T>& receiver,
T& value,
Waker waker) {
std::lock_guard lock(sender_receiver_lock());
T& value) PW_NO_LOCK_SAFETY_ANALYSIS {
// Disable lock analysis because these are fresh sender/receiver pairs and
// do not require a lock to initialize;
receiver.sender_ = &sender;
receiver.value_ = &value;
receiver.cancelled_ = false;
receiver.waker_ = std::move(waker);
sender.receiver_ = &receiver;
}

Expand Down

0 comments on commit 49c68b8

Please sign in to comment.