From ab09049c909375a7998225d74adb2cae0e140282 Mon Sep 17 00:00:00 2001 From: Lalit Maganti Date: Sat, 2 Sep 2023 23:35:08 +0100 Subject: [PATCH] tp: decouple SpawnHandle from result receiver Stream This CL changes the API for spawning futures/streams to decouple the handle which controls whether or not the future continues from receiving the results of the future. This is useful as we may want an operation to complete even while we also are not interested in receiving updates on its results. By splitting the two up, we can drop the stream (corresponding to the updates) without dropping the handle (corresponding to the operation). It also allows us to cancel operations while receiving updates on its progress as well though this seems slightly less useful. Change-Id: Id80adec0a6859b74e4a876844cd96b2ccda11a29 --- include/perfetto/ext/base/threading/future.h | 4 + include/perfetto/ext/base/threading/spawn.h | 86 ++++---- include/perfetto/ext/base/threading/stream.h | 11 +- src/base/threading/spawn.cc | 2 +- src/base/threading/spawn_unittest.cc | 215 ++++++++++++++----- 5 files changed, 220 insertions(+), 98 deletions(-) diff --git a/include/perfetto/ext/base/threading/future.h b/include/perfetto/ext/base/threading/future.h index c509e9ba78..6ec6682e30 100644 --- a/include/perfetto/ext/base/threading/future.h +++ b/include/perfetto/ext/base/threading/future.h @@ -58,6 +58,10 @@ Future MakeFuture(Args... args) { // cancelled. Note, that the implementation of the source future still needs // to propogate cancellation across thread/socket/pipe boundary. // +// Note: Futures *must* be polled on the same thread on which they were created. +// The |SpawnResultFuture| can be used to move the results of Futures between +// threads in a safe manner. +// // Implementation note: // An important point to note is that Future is a final class. Implementation // of Future::Poll happens through an indirection layer by implementing the diff --git a/include/perfetto/ext/base/threading/spawn.h b/include/perfetto/ext/base/threading/spawn.h index df2f8ec06f..8e848fbd9c 100644 --- a/include/perfetto/ext/base/threading/spawn.h +++ b/include/perfetto/ext/base/threading/spawn.h @@ -57,6 +57,9 @@ class SpawnHandle { SpawnHandle(TaskRunner* task_runner, std::function()> fn); ~SpawnHandle(); + SpawnHandle(SpawnHandle&&) = default; + SpawnHandle& operator=(SpawnHandle&&) = default; + private: SpawnHandle(const SpawnHandle&) = delete; SpawnHandle& operator=(const SpawnHandle&) = delete; @@ -65,25 +68,6 @@ class SpawnHandle { std::shared_ptr> polled_future_; }; -// Specialization of SpawnHandle used by Futures/Streams which return T. -// -// Values of T are returned through a Channel which allows reading these -// values on a different thread to where the polling happens. -template -class ResultSpawnHandle { - public: - ResultSpawnHandle(TaskRunner* task_runner, - std::shared_ptr> channel, - std::function()> fn) - : handle_(task_runner, std::move(fn)), channel_(std::move(channel)) {} - - Channel* channel() const { return channel_.get(); } - - private: - SpawnHandle handle_; - std::shared_ptr> channel_; -}; - // "Spawns" a Future on the given TaskRunner and returns an RAII // SpawnHandle which can be used to cancel the spawn. // @@ -103,10 +87,20 @@ PERFETTO_WARN_UNUSED_RESULT inline SpawnHandle SpawnFuture( // Variant of |SpawnFuture| for a Stream allowing returning items of T. // -// See ResultSpawnHandle for how elements from the stream can be consumed. +// The Stream returned by this function can be consumed on any thread, not +// just the thread which ran this function. +// +// Dropping the returned stream does not affect the polling of the underlying +// stream (i.e. the stream returned by |fn|); the polled values will simply be +// dropped. +// +// Dropping the returned SpawnHandle causes the underlying stream to be +// cancelled and dropped ASAP (this happens on the TaskRunner thread so there +// can be some delay). The returned channel will return all the values that were +// produced by the underlying stream before the cancellation. template -PERFETTO_WARN_UNUSED_RESULT inline ResultSpawnHandle SpawnResultStream( - TaskRunner* task_runner, +PERFETTO_WARN_UNUSED_RESULT std::pair> SpawnResultStream( + TaskRunner* runner, std::function()> fn) { class AllVoidCollector : public Collector { public: @@ -114,28 +108,38 @@ PERFETTO_WARN_UNUSED_RESULT inline ResultSpawnHandle SpawnResultStream( FVoid OnDone() override { return FVoid(); } }; auto channel = std::make_shared>(4); - return ResultSpawnHandle( - task_runner, channel, [c = channel, fn = std::move(fn)]() { - return fn() - .MapFuture([c](T value) { - return WriteChannelFuture(c.get(), std::move(value)); - }) - .Concat(OnDestroyStream([c]() { c->Close(); })) - .Collect(std::unique_ptr>( - new AllVoidCollector())); - }); + auto control = std::make_shared>(1); + SpawnHandle handle(runner, [channel, control, fn = std::move(fn)]() { + return fn() + .MapFuture([channel, control](T value) mutable { + if (control->ReadNonBlocking().is_closed) { + return base::Future(base::FVoid()); + } + return WriteChannelFuture(channel.get(), std::move(value)); + }) + .Concat(OnDestroyStream([c = channel]() { c->Close(); })) + .template Collect(std::make_unique()); + }); + Stream stream = ReadChannelStream(channel.get()) + .Concat(OnDestroyStream([channel, control]() { + // Close the control stream and drain an element from + // the channel to unblock it in case it was blocked. + // NOTE: the ordering here is important as we could + // deadlock if it was the other way around! + control->Close(); + base::ignore_result(channel->ReadNonBlocking()); + })); + return std::make_pair(std::move(handle), std::move(stream)); } -// Variant of |SpawnFuture| for a Future allowing returning items of T. -// -// See ResultSpawnHandle for how elements from the future can be consumed. +// Variant of |SpawnResultStream| but for Future. template -PERFETTO_WARN_UNUSED_RESULT inline ResultSpawnHandle SpawnResultFuture( - TaskRunner* task_runner, - std::function()> fn) { - return SpawnResultStream(task_runner, [fn = std::move(fn)]() { - return StreamFromFuture(std::move(fn())); - }); +PERFETTO_WARN_UNUSED_RESULT inline std::pair> +SpawnResultFuture(TaskRunner* task_runner, std::function()> fn) { + auto [handle, stream] = SpawnResultStream( + task_runner, [fn = std::move(fn)]() { return StreamFromFuture(fn()); }); + return std::make_pair(std::move(handle), std::move(stream).Collect( + ToFutureCheckedCollector())); } } // namespace base diff --git a/include/perfetto/ext/base/threading/stream.h b/include/perfetto/ext/base/threading/stream.h index 4d67eb4b46..d12f0b9af1 100644 --- a/include/perfetto/ext/base/threading/stream.h +++ b/include/perfetto/ext/base/threading/stream.h @@ -46,6 +46,10 @@ Stream MakeStream(Args... args) { // version of Iterator. Long-running compute/IO operations which return // multiple values can be represented with a Stream. // +// Note: Streams *must* be polled on the same thread on which they were +// created. The |SpawnResultStreams| can be used to move of the results of +// Streams between threads in a safe manner. +// // Refer to the class documentation for Future as most of the features and // implementation of Future also apply to Stream. template @@ -129,7 +133,9 @@ Stream StreamOf(T first, Ts... rest) { // Creates a Stream which returns the value of |future| before completing. template Stream StreamFromFuture(Future future) { - return StreamOf(std::move(future)).MapFuture([](Future value) { return value; }); + return StreamOf(std::move(future)).MapFuture([](Future value) { + return value; + }); } // Creates a stream which returns no elements but calls |fn| in the destructor @@ -173,8 +179,7 @@ inline std::unique_ptr> ToFutureCheckedCollector() { // containing all the successful results from the stream. If any element is an // error, short-circuits the stream with the error. template -inline std::unique_ptr< - Collector, StatusOr>>> +inline std::unique_ptr, StatusOr>>> StatusOrVectorCollector() { return std::make_unique>(); } diff --git a/src/base/threading/spawn.cc b/src/base/threading/spawn.cc index e727f84206..7ef183dd39 100644 --- a/src/base/threading/spawn.cc +++ b/src/base/threading/spawn.cc @@ -74,7 +74,7 @@ class PolledFuture { } } - void ClearFutureAndWatches(FlatSet interested) { + void ClearFutureAndWatches(const FlatSet& interested) { future_ = std::nullopt; for (PlatformHandle fd : interested) { task_runner_->RemoveFileDescriptorWatch(fd); diff --git a/src/base/threading/spawn_unittest.cc b/src/base/threading/spawn_unittest.cc index c1a2e8435e..b14397e296 100644 --- a/src/base/threading/spawn_unittest.cc +++ b/src/base/threading/spawn_unittest.cc @@ -16,12 +16,15 @@ #include "perfetto/ext/base/threading/spawn.h" +#include #include +#include "perfetto/base/compiler.h" #include "perfetto/ext/base/event_fd.h" #include "perfetto/ext/base/thread_task_runner.h" #include "perfetto/ext/base/threading/future.h" #include "perfetto/ext/base/threading/poll.h" +#include "perfetto/ext/base/threading/stream.h" #include "perfetto/ext/base/threading/util.h" #include "perfetto/ext/base/unix_task_runner.h" #include "src/base/test/test_task_runner.h" @@ -46,87 +49,193 @@ class MockStreamPollable : public StreamPollable { MOCK_METHOD(StreamPollResult, PollNext, (PollContext*), (override)); }; -TEST(SpawnUnittest, SpawnFuture) { - base::TestTaskRunner task_runner; +class SpawnUnittest : public testing::Test { + protected: + void Drop(base::SpawnHandle) {} + void Drop(base::Stream) {} + + base::TestTaskRunner task_runner_; + + base::FlatSet interested_; + base::FlatSet ready_; + PollContext ctx_{&interested_, &ready_}; + + base::EventFd fd_; + std::unique_ptr> future_pollable_ = + std::make_unique>(); + std::unique_ptr> stream_pollable_ = + std::make_unique>(); +}; - base::EventFd fd; - auto pollable = std::make_unique>(); - EXPECT_CALL(*pollable, Poll(_)) - .WillOnce([&fd](PollContext* ctx) { - fd.Clear(); - ctx->RegisterInterested(fd.fd()); +TEST_F(SpawnUnittest, SpawnFuture) { + EXPECT_CALL(*future_pollable_, Poll(_)) + .WillOnce([this](PollContext* ctx) { + fd_.Clear(); + ctx->RegisterInterested(fd_.fd()); return PendingPollResult(); }) .WillOnce(Return(FuturePollResult(1024))); - auto res = SpawnResultFuture( - &task_runner, - [pollable = std::make_shared>>( - std::move(pollable))]() mutable { - return base::Future(std::move(*pollable)); + auto [handle, future] = + SpawnResultFuture(&task_runner_, [this]() mutable { + return base::Future(std::move(future_pollable_)); }); + base::ignore_result(handle); - task_runner.RunUntilIdle(); - ASSERT_EQ(res.channel()->ReadNonBlocking().item, std::nullopt); + task_runner_.RunUntilIdle(); + ASSERT_TRUE(future.Poll(&ctx_).IsPending()); - task_runner.RunUntilIdle(); - ASSERT_EQ(res.channel()->ReadNonBlocking().item, std::nullopt); + task_runner_.RunUntilIdle(); + ASSERT_TRUE(future.Poll(&ctx_).IsPending()); - fd.Notify(); - task_runner.RunUntilIdle(); + fd_.Notify(); + task_runner_.RunUntilIdle(); - auto read = res.channel()->ReadNonBlocking(); - ASSERT_EQ(read.item, 1024); - ASSERT_TRUE(read.is_closed); - - read = res.channel()->ReadNonBlocking(); - ASSERT_TRUE(read.is_closed); + ASSERT_EQ(future.Poll(&ctx_).item(), 1024); } -TEST(SpawnUnittest, SpawnStream) { - base::TestTaskRunner task_runner; - - base::EventFd fd; - auto pollable = std::make_unique>(); - EXPECT_CALL(*pollable, PollNext(_)) - .WillOnce([&fd](PollContext* ctx) { - fd.Clear(); - ctx->RegisterInterested(fd.fd()); +TEST_F(SpawnUnittest, SpawnStream) { + EXPECT_CALL(*stream_pollable_, PollNext(_)) + .WillOnce([this](PollContext* ctx) { + fd_.Clear(); + ctx->RegisterInterested(fd_.fd()); return PendingPollResult(); }) .WillOnce(Return(StreamPollResult(1024))) - .WillOnce([&fd](PollContext* ctx) { - fd.Clear(); - ctx->RegisterInterested(fd.fd()); + .WillOnce([this](PollContext* ctx) { + fd_.Clear(); + ctx->RegisterInterested(fd_.fd()); return PendingPollResult(); }) .WillOnce(Return(StreamPollResult(2048))) .WillOnce(Return(DonePollResult())); - auto res = SpawnResultStream( - &task_runner, - [pollable = std::make_shared>>( - std::move(pollable))]() mutable { - return base::Stream(std::move(*pollable)); + auto [handle, stream] = + SpawnResultStream(&task_runner_, [this]() mutable { + return base::Stream(std::move(stream_pollable_)); + }); + base::ignore_result(handle); + + task_runner_.RunUntilIdle(); + ASSERT_TRUE(stream.PollNext(&ctx_).IsPending()); + + fd_.Notify(); + task_runner_.RunUntilIdle(); + + ASSERT_EQ(stream.PollNext(&ctx_).item(), 1024); + + task_runner_.RunUntilIdle(); + ASSERT_TRUE(stream.PollNext(&ctx_).IsPending()); + + fd_.Notify(); + task_runner_.RunUntilIdle(); + + ASSERT_EQ(stream.PollNext(&ctx_).item(), 2048); + ASSERT_TRUE(stream.PollNext(&ctx_).IsDone()); +} + +TEST_F(SpawnUnittest, SpawnStreamDropStream) { + EXPECT_CALL(*stream_pollable_, PollNext(_)) + .WillOnce([this](PollContext* ctx) { + fd_.Clear(); + ctx->RegisterInterested(fd_.fd()); + return PendingPollResult(); + }) + .WillOnce(Return(StreamPollResult(1))) + .WillOnce(Return(StreamPollResult(2))) + .WillOnce(Return(StreamPollResult(4))) + .WillOnce(Return(StreamPollResult(8))) + .WillOnce(Return(StreamPollResult(16))) + .WillOnce(Return(StreamPollResult(32))) + .WillOnce(Return(StreamPollResult(64))) + .WillOnce(Return(StreamPollResult(128))) + .WillOnce(Return(StreamPollResult(256))) + .WillOnce(Return(StreamPollResult(512))) + .WillOnce(Return(DonePollResult())); + + auto [handle, stream] = + SpawnResultStream(&task_runner_, [this]() mutable { + return base::Stream(std::move(stream_pollable_)); + }); + base::ignore_result(handle); + + task_runner_.RunUntilIdle(); + ASSERT_TRUE(stream.PollNext(&ctx_).IsPending()); + + fd_.Notify(); + task_runner_.RunUntilIdle(); + + // We should get the first 4 elements and then nothing more: this corresponds + // to the internal channel buffer size being 4. + ASSERT_EQ(stream.PollNext(&ctx_).item(), 1); + ASSERT_EQ(stream.PollNext(&ctx_).item(), 2); + ASSERT_EQ(stream.PollNext(&ctx_).item(), 4); + ASSERT_EQ(stream.PollNext(&ctx_).item(), 8); + ASSERT_TRUE(stream.PollNext(&ctx_).IsPending()); + + // Should fill up a bunch more elements. + task_runner_.RunUntilIdle(); + + // Drop the stream. + Drop(std::move(stream)); + + // This should complete the stream. + task_runner_.RunUntilIdle(); + + // Drop the handle and ensure any resulting is completed. + Drop(std::move(handle)); + task_runner_.RunUntilIdle(); +} + +TEST_F(SpawnUnittest, SpawnStreamDropHandle) { + EXPECT_CALL(*stream_pollable_, PollNext(_)) + .WillOnce([this](PollContext* ctx) { + fd_.Clear(); + ctx->RegisterInterested(fd_.fd()); + return PendingPollResult(); + }) + .WillOnce(Return(StreamPollResult(1))) + .WillOnce(Return(StreamPollResult(2))) + .WillOnce(Return(StreamPollResult(4))) + .WillOnce(Return(StreamPollResult(8))) + .WillOnce(Return(StreamPollResult(16))) + .WillOnce(Return(StreamPollResult(32))) + .WillOnce(Return(StreamPollResult(64))) + .WillOnce(Return(StreamPollResult(128))) + .WillOnce(Return(DonePollResult())); + + base::TestTaskRunner task_runner; + auto [handle, stream] = + SpawnResultStream(&task_runner, [this]() mutable { + return base::Stream(std::move(stream_pollable_)); }); + base::ignore_result(handle); task_runner.RunUntilIdle(); - ASSERT_EQ(res.channel()->ReadNonBlocking().item, std::nullopt); + ASSERT_TRUE(stream.PollNext(&ctx_).IsPending()); - fd.Notify(); + fd_.Notify(); task_runner.RunUntilIdle(); - auto read = res.channel()->ReadNonBlocking(); - ASSERT_EQ(read.item, 1024); - ASSERT_FALSE(read.is_closed); + // We should get the first 4 elements and then nothing more: this corresponds + // to the internal channel buffer size being 4. + ASSERT_EQ(stream.PollNext(&ctx_).item(), 1); + ASSERT_EQ(stream.PollNext(&ctx_).item(), 2); + ASSERT_EQ(stream.PollNext(&ctx_).item(), 4); + ASSERT_EQ(stream.PollNext(&ctx_).item(), 8); + ASSERT_TRUE(stream.PollNext(&ctx_).IsPending()); + // Should fill up a bunch more elements. task_runner.RunUntilIdle(); - ASSERT_EQ(res.channel()->ReadNonBlocking().item, std::nullopt); - fd.Notify(); - task_runner.RunUntilIdle(); + // Drop the handle. + Drop(std::move(handle)); - read = res.channel()->ReadNonBlocking(); - ASSERT_EQ(read.item, 2048); - ASSERT_TRUE(read.is_closed); + // We should just get the next four buffered elements and the stream should + // complete. + ASSERT_EQ(stream.PollNext(&ctx_).item(), 16); + ASSERT_EQ(stream.PollNext(&ctx_).item(), 32); + ASSERT_EQ(stream.PollNext(&ctx_).item(), 64); + ASSERT_EQ(stream.PollNext(&ctx_).item(), 128); + ASSERT_TRUE(stream.PollNext(&ctx_).IsDone()); } } // namespace