From 49c68b83a009bf14d2df21f4d62601e875ff198f Mon Sep 17 00:00:00 2001 From: Taylor Cramer Date: Tue, 24 Sep 2024 01:04:05 +0000 Subject: [PATCH] pw_async2: Use Waker from Context in OnceSender MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Change-Id: I7afb81d88066c252d4260c923917c29b0a2a7ad8 Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/237512 Reviewed-by: Wyatt Hepler Lint: Lint 🤖 Docs-Not-Needed: Taylor Cramer Presubmit-Verified: CQ Bot Account Pigweed-Auto-Submit: Taylor Cramer Commit-Queue: Auto-Submit --- pw_async2/once_sender_test.cc | 30 +++---- pw_async2/public/pw_async2/once_sender.h | 102 ++++++++++------------- 2 files changed, 53 insertions(+), 79 deletions(-) diff --git a/pw_async2/once_sender_test.cc b/pw_async2/once_sender_test.cc index a92a93a98f..eaaba0bff8 100644 --- a/pw_async2/once_sender_test.cc +++ b/pw_async2/once_sender_test.cc @@ -49,24 +49,21 @@ class ValueTask : public Task { public: ValueTask(bool use_make_constructor = true) : use_make_constructor_(use_make_constructor) {} - Poll<> DoPend(Context& ctx) override { + Poll<> DoPend(Context& cx) override { if (!receiver_) { if (use_make_constructor_) { auto [send, recv] = - pw::async2::MakeOnceSenderAndReceiver( - ctx.GetWaker(pw::async2::WaitReason::Unspecified())); + pw::async2::MakeOnceSenderAndReceiver(); sender_.emplace(std::move(send)); receiver_.emplace(std::move(recv)); } else { receiver_.emplace(); sender_.emplace(); pw::async2::InitializeOnceSenderAndReceiver( - sender_.value(), - receiver_.value(), - ctx.GetWaker(pw::async2::WaitReason::Unspecified())); + sender_.value(), receiver_.value()); } } - Poll> poll = receiver_.value().Pend(); + Poll> poll = receiver_.value().Pend(cx); if (poll.IsReady()) { ready_value_.emplace(std::move(poll.value())); return Ready(); @@ -163,25 +160,21 @@ class VectorTask : public Task { VectorTask(bool use_make_constructor = true) : use_make_constructor_(use_make_constructor) {} - Poll<> DoPend(Context& ctx) override { + Poll<> DoPend(Context& cx) override { if (!receiver_) { if (use_make_constructor_) { auto [send, recv] = - pw::async2::MakeOnceRefSenderAndReceiver>( - value_, ctx.GetWaker(pw::async2::WaitReason::Unspecified())); + pw::async2::MakeOnceRefSenderAndReceiver>(value_); sender_.emplace(std::move(send)); receiver_.emplace(std::move(recv)); } else { sender_.emplace(); receiver_.emplace(); pw::async2::InitializeOnceRefSenderAndReceiver>( - sender_.value(), - receiver_.value(), - value_, - ctx.GetWaker(pw::async2::WaitReason::Unspecified())); + sender_.value(), receiver_.value(), value_); } } - Poll poll = receiver_->Pend(); + Poll poll = receiver_->Pend(cx); if (poll.IsReady()) { ready_value_.emplace(poll.value()); return Ready(); @@ -264,15 +257,14 @@ TEST(OnceSender, DestroyingOnceRefSenderCausesReceiverPendToReturnCancelled) { class MoveOnlyRefTask : public Task { public: - Poll<> DoPend(Context& ctx) override { + Poll<> DoPend(Context& cx) override { if (!receiver_) { auto [send, recv] = - pw::async2::MakeOnceRefSenderAndReceiver( - value_, ctx.GetWaker(pw::async2::WaitReason::Unspecified())); + pw::async2::MakeOnceRefSenderAndReceiver(value_); sender_.emplace(std::move(send)); receiver_.emplace(std::move(recv)); } - Poll poll = receiver_->Pend(); + Poll poll = receiver_->Pend(cx); if (poll.IsReady()) { ready_value_.emplace(poll.value()); return Ready(); diff --git a/pw_async2/public/pw_async2/once_sender.h b/pw_async2/public/pw_async2/once_sender.h index 84df7e0aa6..4eb265e5d5 100644 --- a/pw_async2/public/pw_async2/once_sender.h +++ b/pw_async2/public/pw_async2/once_sender.h @@ -14,6 +14,7 @@ #pragma once #include "pw_async2/dispatcher.h" +#include "pw_async2/dispatcher_base.h" #include "pw_function/function.h" #include "pw_sync/mutex.h" @@ -64,31 +65,27 @@ class OnceReceiver final { /// Returns `Ready` with a result containing the value once the value has been /// assigned. If the sender is destroyed before sending a value, a `Cancelled` /// result will be returned. - Poll> Pend() { + Poll> Pend(Context& cx) { std::lock_guard lock(sender_receiver_lock()); if (value_.has_value()) { return Ready(std::move(*value_)); } else if (!sender_) { return Ready(Status::Cancelled()); } + waker_ = cx.GetWaker(WaitReason::Unspecified()); return Pending(); } private: template - friend std::pair, OnceReceiver> MakeOnceSenderAndReceiver( - Waker); + friend std::pair, OnceReceiver> MakeOnceSenderAndReceiver(); template friend void InitializeOnceSenderAndReceiver(OnceSender& sender, - OnceReceiver& receiver, - Waker waker); + OnceReceiver& receiver); friend class OnceSender; - // `waker` is the `Waker` to be awoken when the value is assigned. - OnceReceiver(Waker waker) : waker_(std::move(waker)) {} - OnceSender* sender_ PW_GUARDED_BY(sender_receiver_lock()) = nullptr; - std::optional value_ PW_GUARDED_BY(sender_receiver_lock()); + std::optional value_ PW_GUARDED_BY(sender_receiver_lock()) = std::nullopt; Waker waker_; }; @@ -145,12 +142,10 @@ class OnceSender final { private: template - friend std::pair, OnceReceiver> MakeOnceSenderAndReceiver( - Waker); + friend std::pair, OnceReceiver> MakeOnceSenderAndReceiver(); template friend void InitializeOnceSenderAndReceiver(OnceSender& sender, - OnceReceiver& receiver, - Waker waker); + OnceReceiver& receiver); friend class OnceReceiver; OnceSender(OnceReceiver* receiver) : receiver_(receiver) {} @@ -167,28 +162,21 @@ OnceReceiver::~OnceReceiver() { } /// Construct a pair of `OnceSender` and `OnceReceiver`. -/// @param waker The `Waker` to be awoken when the value is sent. template -std::pair, OnceReceiver> MakeOnceSenderAndReceiver( - Waker waker) { - OnceReceiver receiver(std::move(waker)); - OnceSender sender(&receiver); - { - std::lock_guard lock(sender_receiver_lock()); - receiver.sender_ = &sender; - } - return std::make_pair(std::move(sender), std::move(receiver)); +std::pair, OnceReceiver> MakeOnceSenderAndReceiver() { + std::pair, OnceReceiver> send_recv; + InitializeOnceSenderAndReceiver(send_recv.first, send_recv.second); + return send_recv; } /// Initialize a pair of `OnceSender` and `OnceReceiver`. -/// @param waker The `Waker` to be awoken when the value is sent. template void InitializeOnceSenderAndReceiver(OnceSender& sender, - OnceReceiver& receiver, - Waker waker) { - std::lock_guard lock(sender_receiver_lock()); + OnceReceiver& receiver) + PW_NO_LOCK_SAFETY_ANALYSIS { + // Disable lock analysis because these are fresh sender/receiver pairs and + // do not require a lock to initialize; receiver.sender_ = &sender; - receiver.waker_ = std::move(waker); sender.receiver_ = &receiver; } @@ -215,7 +203,6 @@ class OnceRefReceiver final { sender_->receiver_ = this; } value_ = other.value_; - cancelled_ = other.cancelled_; waker_ = std::move(other.waker_); } @@ -228,35 +215,35 @@ class OnceRefReceiver final { /// Returns `Ready` with an `ok` status when the modification of the /// reference is complete. If the sender is destroyed before updating the /// reference, a `cancelled` status is returned. - Poll Pend() { + Poll Pend(Context& cx) { std::lock_guard lock(sender_receiver_lock()); - if (cancelled_) { - return Ready(Status::Cancelled()); - } - if (waker_.IsEmpty()) { + if (value_ == nullptr) { return Ready(OkStatus()); } + if (sender_ == nullptr) { + return Ready(Status::Cancelled()); + } + waker_ = cx.GetWaker(WaitReason::Unspecified()); return Pending(); } private: template friend std::pair, OnceRefReceiver> - MakeOnceRefSenderAndReceiver(U&, Waker); + MakeOnceRefSenderAndReceiver(U&); template friend void InitializeOnceRefSenderAndReceiver(OnceRefSender& sender, OnceRefReceiver& receiver, - U& value, - Waker waker); + U& value); friend class OnceRefSender; - // `waker` is the `Waker` to be awoken when the value is assigned. - OnceRefReceiver(T& value, Waker waker) - : value_(&value), waker_(std::move(waker)) {} + OnceRefReceiver(T& value) : value_(&value) {} - OnceRefSender* sender_ PW_GUARDED_BY(sender_receiver_lock()) = nullptr; + // Pointer to the value to be modified. Set to `nullptr` once modification + // is complete. T* value_ PW_GUARDED_BY(sender_receiver_lock()) = nullptr; - bool cancelled_ PW_GUARDED_BY(sender_receiver_lock()) = false; + // Pointer to the modifier. Set to `nullptr` if the sender disappears. + OnceRefSender* sender_ PW_GUARDED_BY(sender_receiver_lock()) = nullptr; Waker waker_; }; @@ -273,10 +260,7 @@ class OnceRefSender final { std::lock_guard lock(sender_receiver_lock()); if (receiver_) { receiver_->sender_ = nullptr; - if (!receiver_->waker_.IsEmpty()) { - receiver_->cancelled_ = true; - std::move(receiver_->waker_).Wake(); - } + std::move(receiver_->waker_).Wake(); } } @@ -300,6 +284,7 @@ class OnceRefSender final { *(receiver_->value_) = value; std::move(receiver_->waker_).Wake(); receiver_->sender_ = nullptr; + receiver_->value_ = nullptr; receiver_ = nullptr; } } @@ -311,6 +296,7 @@ class OnceRefSender final { *(receiver_->value_) = std::move(value); std::move(receiver_->waker_).Wake(); receiver_->sender_ = nullptr; + receiver_->value_ = nullptr; receiver_ = nullptr; } } @@ -334,6 +320,7 @@ class OnceRefSender final { if (receiver_) { std::move(receiver_->waker_).Wake(); receiver_->sender_ = nullptr; + receiver_->value_ = nullptr; receiver_ = nullptr; } } @@ -341,12 +328,11 @@ class OnceRefSender final { private: template friend std::pair, OnceRefReceiver> - MakeOnceRefSenderAndReceiver(U&, Waker); + MakeOnceRefSenderAndReceiver(U&); template friend void InitializeOnceRefSenderAndReceiver(OnceRefSender& sender, OnceRefReceiver& receiver, - U& value, - Waker waker); + U& value); friend class OnceRefReceiver; OnceRefSender(OnceRefReceiver* receiver) : receiver_(receiver) {} @@ -366,30 +352,26 @@ OnceRefReceiver::~OnceRefReceiver() { /// @param[in] value The reference to be mutated by the sender. It must mot be /// read or modified until either `OnceRefSender` indicates `Ready()` or /// either the `OnceRefSender` or `OnceRefReceiver` is destroyed. -/// @param[in] waker The `Waker` to be awoken when the reference is updated. template std::pair, OnceRefReceiver> MakeOnceRefSenderAndReceiver( - T& value, Waker waker) { - OnceRefReceiver receiver(value, std::move(waker)); - OnceRefSender sender(&receiver); - return std::make_pair(std::move(sender), std::move(receiver)); + T& value) { + std::pair, OnceRefReceiver> send_recv; + InitializeOnceRefSenderAndReceiver(send_recv.first, send_recv.second, value); + return send_recv; } /// Initialize a pair of `OnceRefSender` and `OnceRefReceiver`. /// @param[in] value The reference to be mutated by the sender. It must mot be /// read or modified until either `OnceRefSender` indicates `Ready()` or /// either the `OnceRefSender` or `OnceRefReceiver` is destroyed. -/// @param[in] waker The `Waker` to be awoken when the reference is updated. template void InitializeOnceRefSenderAndReceiver(OnceRefSender& sender, OnceRefReceiver& receiver, - T& value, - Waker waker) { - std::lock_guard lock(sender_receiver_lock()); + T& value) PW_NO_LOCK_SAFETY_ANALYSIS { + // Disable lock analysis because these are fresh sender/receiver pairs and + // do not require a lock to initialize; receiver.sender_ = &sender; receiver.value_ = &value; - receiver.cancelled_ = false; - receiver.waker_ = std::move(waker); sender.receiver_ = &receiver; }