diff --git a/include/perfetto/ext/base/threading/future.h b/include/perfetto/ext/base/threading/future.h index c509e9ba78..6ec6682e30 100644 --- a/include/perfetto/ext/base/threading/future.h +++ b/include/perfetto/ext/base/threading/future.h @@ -58,6 +58,10 @@ Future MakeFuture(Args... args) { // cancelled. Note, that the implementation of the source future still needs // to propogate cancellation across thread/socket/pipe boundary. // +// Note: Futures *must* be polled on the same thread on which they were created. +// The |SpawnResultFuture| can be used to move the results of Futures between +// threads in a safe manner. +// // Implementation note: // An important point to note is that Future is a final class. Implementation // of Future::Poll happens through an indirection layer by implementing the diff --git a/include/perfetto/ext/base/threading/spawn.h b/include/perfetto/ext/base/threading/spawn.h index df2f8ec06f..8e848fbd9c 100644 --- a/include/perfetto/ext/base/threading/spawn.h +++ b/include/perfetto/ext/base/threading/spawn.h @@ -57,6 +57,9 @@ class SpawnHandle { SpawnHandle(TaskRunner* task_runner, std::function()> fn); ~SpawnHandle(); + SpawnHandle(SpawnHandle&&) = default; + SpawnHandle& operator=(SpawnHandle&&) = default; + private: SpawnHandle(const SpawnHandle&) = delete; SpawnHandle& operator=(const SpawnHandle&) = delete; @@ -65,25 +68,6 @@ class SpawnHandle { std::shared_ptr> polled_future_; }; -// Specialization of SpawnHandle used by Futures/Streams which return T. -// -// Values of T are returned through a Channel which allows reading these -// values on a different thread to where the polling happens. -template -class ResultSpawnHandle { - public: - ResultSpawnHandle(TaskRunner* task_runner, - std::shared_ptr> channel, - std::function()> fn) - : handle_(task_runner, std::move(fn)), channel_(std::move(channel)) {} - - Channel* channel() const { return channel_.get(); } - - private: - SpawnHandle handle_; - std::shared_ptr> channel_; -}; - // "Spawns" a Future on the given TaskRunner and returns an RAII // SpawnHandle which can be used to cancel the spawn. // @@ -103,10 +87,20 @@ PERFETTO_WARN_UNUSED_RESULT inline SpawnHandle SpawnFuture( // Variant of |SpawnFuture| for a Stream allowing returning items of T. // -// See ResultSpawnHandle for how elements from the stream can be consumed. +// The Stream returned by this function can be consumed on any thread, not +// just the thread which ran this function. +// +// Dropping the returned stream does not affect the polling of the underlying +// stream (i.e. the stream returned by |fn|); the polled values will simply be +// dropped. +// +// Dropping the returned SpawnHandle causes the underlying stream to be +// cancelled and dropped ASAP (this happens on the TaskRunner thread so there +// can be some delay). The returned channel will return all the values that were +// produced by the underlying stream before the cancellation. template -PERFETTO_WARN_UNUSED_RESULT inline ResultSpawnHandle SpawnResultStream( - TaskRunner* task_runner, +PERFETTO_WARN_UNUSED_RESULT std::pair> SpawnResultStream( + TaskRunner* runner, std::function()> fn) { class AllVoidCollector : public Collector { public: @@ -114,28 +108,38 @@ PERFETTO_WARN_UNUSED_RESULT inline ResultSpawnHandle SpawnResultStream( FVoid OnDone() override { return FVoid(); } }; auto channel = std::make_shared>(4); - return ResultSpawnHandle( - task_runner, channel, [c = channel, fn = std::move(fn)]() { - return fn() - .MapFuture([c](T value) { - return WriteChannelFuture(c.get(), std::move(value)); - }) - .Concat(OnDestroyStream([c]() { c->Close(); })) - .Collect(std::unique_ptr>( - new AllVoidCollector())); - }); + auto control = std::make_shared>(1); + SpawnHandle handle(runner, [channel, control, fn = std::move(fn)]() { + return fn() + .MapFuture([channel, control](T value) mutable { + if (control->ReadNonBlocking().is_closed) { + return base::Future(base::FVoid()); + } + return WriteChannelFuture(channel.get(), std::move(value)); + }) + .Concat(OnDestroyStream([c = channel]() { c->Close(); })) + .template Collect(std::make_unique()); + }); + Stream stream = ReadChannelStream(channel.get()) + .Concat(OnDestroyStream([channel, control]() { + // Close the control stream and drain an element from + // the channel to unblock it in case it was blocked. + // NOTE: the ordering here is important as we could + // deadlock if it was the other way around! + control->Close(); + base::ignore_result(channel->ReadNonBlocking()); + })); + return std::make_pair(std::move(handle), std::move(stream)); } -// Variant of |SpawnFuture| for a Future allowing returning items of T. -// -// See ResultSpawnHandle for how elements from the future can be consumed. +// Variant of |SpawnResultStream| but for Future. template -PERFETTO_WARN_UNUSED_RESULT inline ResultSpawnHandle SpawnResultFuture( - TaskRunner* task_runner, - std::function()> fn) { - return SpawnResultStream(task_runner, [fn = std::move(fn)]() { - return StreamFromFuture(std::move(fn())); - }); +PERFETTO_WARN_UNUSED_RESULT inline std::pair> +SpawnResultFuture(TaskRunner* task_runner, std::function()> fn) { + auto [handle, stream] = SpawnResultStream( + task_runner, [fn = std::move(fn)]() { return StreamFromFuture(fn()); }); + return std::make_pair(std::move(handle), std::move(stream).Collect( + ToFutureCheckedCollector())); } } // namespace base diff --git a/include/perfetto/ext/base/threading/stream.h b/include/perfetto/ext/base/threading/stream.h index 4d67eb4b46..d12f0b9af1 100644 --- a/include/perfetto/ext/base/threading/stream.h +++ b/include/perfetto/ext/base/threading/stream.h @@ -46,6 +46,10 @@ Stream MakeStream(Args... args) { // version of Iterator. Long-running compute/IO operations which return // multiple values can be represented with a Stream. // +// Note: Streams *must* be polled on the same thread on which they were +// created. The |SpawnResultStreams| can be used to move of the results of +// Streams between threads in a safe manner. +// // Refer to the class documentation for Future as most of the features and // implementation of Future also apply to Stream. template @@ -129,7 +133,9 @@ Stream StreamOf(T first, Ts... rest) { // Creates a Stream which returns the value of |future| before completing. template Stream StreamFromFuture(Future future) { - return StreamOf(std::move(future)).MapFuture([](Future value) { return value; }); + return StreamOf(std::move(future)).MapFuture([](Future value) { + return value; + }); } // Creates a stream which returns no elements but calls |fn| in the destructor @@ -173,8 +179,7 @@ inline std::unique_ptr> ToFutureCheckedCollector() { // containing all the successful results from the stream. If any element is an // error, short-circuits the stream with the error. template -inline std::unique_ptr< - Collector, StatusOr>>> +inline std::unique_ptr, StatusOr>>> StatusOrVectorCollector() { return std::make_unique>(); } diff --git a/src/base/threading/spawn.cc b/src/base/threading/spawn.cc index e727f84206..7ef183dd39 100644 --- a/src/base/threading/spawn.cc +++ b/src/base/threading/spawn.cc @@ -74,7 +74,7 @@ class PolledFuture { } } - void ClearFutureAndWatches(FlatSet interested) { + void ClearFutureAndWatches(const FlatSet& interested) { future_ = std::nullopt; for (PlatformHandle fd : interested) { task_runner_->RemoveFileDescriptorWatch(fd); diff --git a/src/base/threading/spawn_unittest.cc b/src/base/threading/spawn_unittest.cc index c1a2e8435e..b14397e296 100644 --- a/src/base/threading/spawn_unittest.cc +++ b/src/base/threading/spawn_unittest.cc @@ -16,12 +16,15 @@ #include "perfetto/ext/base/threading/spawn.h" +#include #include +#include "perfetto/base/compiler.h" #include "perfetto/ext/base/event_fd.h" #include "perfetto/ext/base/thread_task_runner.h" #include "perfetto/ext/base/threading/future.h" #include "perfetto/ext/base/threading/poll.h" +#include "perfetto/ext/base/threading/stream.h" #include "perfetto/ext/base/threading/util.h" #include "perfetto/ext/base/unix_task_runner.h" #include "src/base/test/test_task_runner.h" @@ -46,87 +49,193 @@ class MockStreamPollable : public StreamPollable { MOCK_METHOD(StreamPollResult, PollNext, (PollContext*), (override)); }; -TEST(SpawnUnittest, SpawnFuture) { - base::TestTaskRunner task_runner; +class SpawnUnittest : public testing::Test { + protected: + void Drop(base::SpawnHandle) {} + void Drop(base::Stream) {} + + base::TestTaskRunner task_runner_; + + base::FlatSet interested_; + base::FlatSet ready_; + PollContext ctx_{&interested_, &ready_}; + + base::EventFd fd_; + std::unique_ptr> future_pollable_ = + std::make_unique>(); + std::unique_ptr> stream_pollable_ = + std::make_unique>(); +}; - base::EventFd fd; - auto pollable = std::make_unique>(); - EXPECT_CALL(*pollable, Poll(_)) - .WillOnce([&fd](PollContext* ctx) { - fd.Clear(); - ctx->RegisterInterested(fd.fd()); +TEST_F(SpawnUnittest, SpawnFuture) { + EXPECT_CALL(*future_pollable_, Poll(_)) + .WillOnce([this](PollContext* ctx) { + fd_.Clear(); + ctx->RegisterInterested(fd_.fd()); return PendingPollResult(); }) .WillOnce(Return(FuturePollResult(1024))); - auto res = SpawnResultFuture( - &task_runner, - [pollable = std::make_shared>>( - std::move(pollable))]() mutable { - return base::Future(std::move(*pollable)); + auto [handle, future] = + SpawnResultFuture(&task_runner_, [this]() mutable { + return base::Future(std::move(future_pollable_)); }); + base::ignore_result(handle); - task_runner.RunUntilIdle(); - ASSERT_EQ(res.channel()->ReadNonBlocking().item, std::nullopt); + task_runner_.RunUntilIdle(); + ASSERT_TRUE(future.Poll(&ctx_).IsPending()); - task_runner.RunUntilIdle(); - ASSERT_EQ(res.channel()->ReadNonBlocking().item, std::nullopt); + task_runner_.RunUntilIdle(); + ASSERT_TRUE(future.Poll(&ctx_).IsPending()); - fd.Notify(); - task_runner.RunUntilIdle(); + fd_.Notify(); + task_runner_.RunUntilIdle(); - auto read = res.channel()->ReadNonBlocking(); - ASSERT_EQ(read.item, 1024); - ASSERT_TRUE(read.is_closed); - - read = res.channel()->ReadNonBlocking(); - ASSERT_TRUE(read.is_closed); + ASSERT_EQ(future.Poll(&ctx_).item(), 1024); } -TEST(SpawnUnittest, SpawnStream) { - base::TestTaskRunner task_runner; - - base::EventFd fd; - auto pollable = std::make_unique>(); - EXPECT_CALL(*pollable, PollNext(_)) - .WillOnce([&fd](PollContext* ctx) { - fd.Clear(); - ctx->RegisterInterested(fd.fd()); +TEST_F(SpawnUnittest, SpawnStream) { + EXPECT_CALL(*stream_pollable_, PollNext(_)) + .WillOnce([this](PollContext* ctx) { + fd_.Clear(); + ctx->RegisterInterested(fd_.fd()); return PendingPollResult(); }) .WillOnce(Return(StreamPollResult(1024))) - .WillOnce([&fd](PollContext* ctx) { - fd.Clear(); - ctx->RegisterInterested(fd.fd()); + .WillOnce([this](PollContext* ctx) { + fd_.Clear(); + ctx->RegisterInterested(fd_.fd()); return PendingPollResult(); }) .WillOnce(Return(StreamPollResult(2048))) .WillOnce(Return(DonePollResult())); - auto res = SpawnResultStream( - &task_runner, - [pollable = std::make_shared>>( - std::move(pollable))]() mutable { - return base::Stream(std::move(*pollable)); + auto [handle, stream] = + SpawnResultStream(&task_runner_, [this]() mutable { + return base::Stream(std::move(stream_pollable_)); + }); + base::ignore_result(handle); + + task_runner_.RunUntilIdle(); + ASSERT_TRUE(stream.PollNext(&ctx_).IsPending()); + + fd_.Notify(); + task_runner_.RunUntilIdle(); + + ASSERT_EQ(stream.PollNext(&ctx_).item(), 1024); + + task_runner_.RunUntilIdle(); + ASSERT_TRUE(stream.PollNext(&ctx_).IsPending()); + + fd_.Notify(); + task_runner_.RunUntilIdle(); + + ASSERT_EQ(stream.PollNext(&ctx_).item(), 2048); + ASSERT_TRUE(stream.PollNext(&ctx_).IsDone()); +} + +TEST_F(SpawnUnittest, SpawnStreamDropStream) { + EXPECT_CALL(*stream_pollable_, PollNext(_)) + .WillOnce([this](PollContext* ctx) { + fd_.Clear(); + ctx->RegisterInterested(fd_.fd()); + return PendingPollResult(); + }) + .WillOnce(Return(StreamPollResult(1))) + .WillOnce(Return(StreamPollResult(2))) + .WillOnce(Return(StreamPollResult(4))) + .WillOnce(Return(StreamPollResult(8))) + .WillOnce(Return(StreamPollResult(16))) + .WillOnce(Return(StreamPollResult(32))) + .WillOnce(Return(StreamPollResult(64))) + .WillOnce(Return(StreamPollResult(128))) + .WillOnce(Return(StreamPollResult(256))) + .WillOnce(Return(StreamPollResult(512))) + .WillOnce(Return(DonePollResult())); + + auto [handle, stream] = + SpawnResultStream(&task_runner_, [this]() mutable { + return base::Stream(std::move(stream_pollable_)); + }); + base::ignore_result(handle); + + task_runner_.RunUntilIdle(); + ASSERT_TRUE(stream.PollNext(&ctx_).IsPending()); + + fd_.Notify(); + task_runner_.RunUntilIdle(); + + // We should get the first 4 elements and then nothing more: this corresponds + // to the internal channel buffer size being 4. + ASSERT_EQ(stream.PollNext(&ctx_).item(), 1); + ASSERT_EQ(stream.PollNext(&ctx_).item(), 2); + ASSERT_EQ(stream.PollNext(&ctx_).item(), 4); + ASSERT_EQ(stream.PollNext(&ctx_).item(), 8); + ASSERT_TRUE(stream.PollNext(&ctx_).IsPending()); + + // Should fill up a bunch more elements. + task_runner_.RunUntilIdle(); + + // Drop the stream. + Drop(std::move(stream)); + + // This should complete the stream. + task_runner_.RunUntilIdle(); + + // Drop the handle and ensure any resulting is completed. + Drop(std::move(handle)); + task_runner_.RunUntilIdle(); +} + +TEST_F(SpawnUnittest, SpawnStreamDropHandle) { + EXPECT_CALL(*stream_pollable_, PollNext(_)) + .WillOnce([this](PollContext* ctx) { + fd_.Clear(); + ctx->RegisterInterested(fd_.fd()); + return PendingPollResult(); + }) + .WillOnce(Return(StreamPollResult(1))) + .WillOnce(Return(StreamPollResult(2))) + .WillOnce(Return(StreamPollResult(4))) + .WillOnce(Return(StreamPollResult(8))) + .WillOnce(Return(StreamPollResult(16))) + .WillOnce(Return(StreamPollResult(32))) + .WillOnce(Return(StreamPollResult(64))) + .WillOnce(Return(StreamPollResult(128))) + .WillOnce(Return(DonePollResult())); + + base::TestTaskRunner task_runner; + auto [handle, stream] = + SpawnResultStream(&task_runner, [this]() mutable { + return base::Stream(std::move(stream_pollable_)); }); + base::ignore_result(handle); task_runner.RunUntilIdle(); - ASSERT_EQ(res.channel()->ReadNonBlocking().item, std::nullopt); + ASSERT_TRUE(stream.PollNext(&ctx_).IsPending()); - fd.Notify(); + fd_.Notify(); task_runner.RunUntilIdle(); - auto read = res.channel()->ReadNonBlocking(); - ASSERT_EQ(read.item, 1024); - ASSERT_FALSE(read.is_closed); + // We should get the first 4 elements and then nothing more: this corresponds + // to the internal channel buffer size being 4. + ASSERT_EQ(stream.PollNext(&ctx_).item(), 1); + ASSERT_EQ(stream.PollNext(&ctx_).item(), 2); + ASSERT_EQ(stream.PollNext(&ctx_).item(), 4); + ASSERT_EQ(stream.PollNext(&ctx_).item(), 8); + ASSERT_TRUE(stream.PollNext(&ctx_).IsPending()); + // Should fill up a bunch more elements. task_runner.RunUntilIdle(); - ASSERT_EQ(res.channel()->ReadNonBlocking().item, std::nullopt); - fd.Notify(); - task_runner.RunUntilIdle(); + // Drop the handle. + Drop(std::move(handle)); - read = res.channel()->ReadNonBlocking(); - ASSERT_EQ(read.item, 2048); - ASSERT_TRUE(read.is_closed); + // We should just get the next four buffered elements and the stream should + // complete. + ASSERT_EQ(stream.PollNext(&ctx_).item(), 16); + ASSERT_EQ(stream.PollNext(&ctx_).item(), 32); + ASSERT_EQ(stream.PollNext(&ctx_).item(), 64); + ASSERT_EQ(stream.PollNext(&ctx_).item(), 128); + ASSERT_TRUE(stream.PollNext(&ctx_).IsDone()); } } // namespace