diff --git a/include/perfetto/ext/base/threading/util.h b/include/perfetto/ext/base/threading/util.h index a08a4b0383..2a2b83eb81 100644 --- a/include/perfetto/ext/base/threading/util.h +++ b/include/perfetto/ext/base/threading/util.h @@ -38,10 +38,24 @@ namespace base { // Blocks the calling thread until |fd| is considered "readable". In Linux, // this corresponds to |POLLOUT| or |POLLHUP| being returned if |fd| is polled. -inline void BlockUntilReadableFd(base::PlatformHandle fd) { +// If |timeout_ms| is specified, waits that many ms before stopping. +// +// Returns true if the function returned because the fd was readable and false +// otherwise. +inline bool BlockUntilReadableFd( + base::PlatformHandle fd, + std::optional timeout_ms = std::nullopt) { + bool is_readable = false; base::UnixTaskRunner runner; - runner.AddFileDescriptorWatch(fd, [&runner]() { runner.Quit(); }); + runner.AddFileDescriptorWatch(fd, [&runner, &is_readable]() { + is_readable = true; + runner.Quit(); + }); + if (timeout_ms) { + runner.PostDelayedTask([&runner]() { runner.Quit(); }, *timeout_ms); + } runner.Run(); + return is_readable; } // Creates a Stream which returns all the data from |channel| and completes @@ -106,25 +120,35 @@ Future WriteChannelFuture(Channel* channel, T item) { // repeatedly. The returned stream only completes when |fn| returns // std::nullopt. // +// Callers can optionally specify a |on_destroy| function which is executed when +// the returned stream is destroyed. This is useful for informing the work +// spawned on the thread pool that the result is no longer necessary. +// // The intended usage of this function is to schedule CPU intensive work on a // background thread pool and receive regular "updates" on the progress by: // a) breaking the work into chunks // b) returning some indication of progress/partial results through |T|. template -Stream RunOnThreadPool(ThreadPool* pool, - std::function()> fn) { +Stream RunOnThreadPool( + ThreadPool* pool, + std::function()> fn, + std::function on_destroy = [] {}) { class RunOnPoolImpl : public StreamPollable { public: explicit RunOnPoolImpl(ThreadPool* pool, - std::function()> fn) + std::function()> fn, + std::function on_destroy) : pool_(pool), fn_(std::make_shared()>>( std::move(fn))), + on_destroy_(std::move(on_destroy)), channel_(new Channel(1)), channel_stream_(ReadChannelStream(channel_.get())) { RunFn(); } + ~RunOnPoolImpl() override { on_destroy_(); } + StreamPollResult PollNext(PollContext* ctx) override { ASSIGN_OR_RETURN_IF_PENDING_STREAM(res, channel_stream_.PollNext(ctx)); if (res.IsDone()) { @@ -154,10 +178,11 @@ Stream RunOnThreadPool(ThreadPool* pool, ThreadPool* pool_ = nullptr; std::shared_ptr()>> fn_; + std::function on_destroy_; std::shared_ptr> channel_; base::Stream channel_stream_; }; - return MakeStream(pool, std::move(fn)); + return MakeStream(pool, std::move(fn), std::move(on_destroy)); } // Creates a Future which yields the result of executing |fn| on |pool|. The diff --git a/src/cloud_trace_processor/trace_processor_wrapper.cc b/src/cloud_trace_processor/trace_processor_wrapper.cc index 5093e5285d..a74dc933aa 100644 --- a/src/cloud_trace_processor/trace_processor_wrapper.cc +++ b/src/cloud_trace_processor/trace_processor_wrapper.cc @@ -149,7 +149,8 @@ TraceProcessorWrapper::Query(const std::string& query) { } return base::RunOnThreadPool( thread_pool_, - QueryRunner(trace_processor_, query, trace_path_, statefulness_)); + QueryRunner(trace_processor_, query, trace_path_, statefulness_), + [tp = trace_processor_] { tp->InterruptQuery(); }); } } // namespace cloud_trace_processor diff --git a/src/cloud_trace_processor/trace_processor_wrapper_unittest.cc b/src/cloud_trace_processor/trace_processor_wrapper_unittest.cc index 8739783db3..3baa3db74c 100644 --- a/src/cloud_trace_processor/trace_processor_wrapper_unittest.cc +++ b/src/cloud_trace_processor/trace_processor_wrapper_unittest.cc @@ -16,11 +16,13 @@ #include "src/cloud_trace_processor/trace_processor_wrapper.h" #include +#include #include #include "perfetto/base/flat_set.h" #include "perfetto/base/platform_handle.h" #include "perfetto/base/status.h" +#include "perfetto/base/time.h" #include "perfetto/ext/base/status_or.h" #include "perfetto/ext/base/string_utils.h" #include "perfetto/ext/base/string_view.h" @@ -59,19 +61,27 @@ std::vector>> SimpleSystraceChunked() { } template -T WaitForFutureReady(base::Future& future) { +std::optional WaitForFutureReady(base::Future& future, + std::optional timeout_ms) { base::FlatSet ready; base::FlatSet interested; base::PollContext ctx(&interested, &ready); auto res = future.Poll(&ctx); for (; res.IsPending(); res = future.Poll(&ctx)) { PERFETTO_CHECK(interested.size() == 1); - base::BlockUntilReadableFd(*interested.begin()); + if (!base::BlockUntilReadableFd(*interested.begin(), timeout_ms)) { + return std::nullopt; + } interested = {}; } return res.item(); } +template +T WaitForFutureReady(base::Future& future) { + return *WaitForFutureReady(future, std::nullopt); +} + template std::optional WaitForStreamReady(base::Stream& stream) { base::FlatSet ready; @@ -200,6 +210,35 @@ TEST(TraceProcessorWrapperUnittest, Chunked) { } } +TEST(TraceProcessorWrapperUnittest, Interrupt) { + base::ThreadPool pool(1); + TraceProcessorWrapper wrapper("foobar", &pool, SF::kStateful); + + // Create a query which will run ~forever. When this stream is dropped we + // should propogate to the TP instance to also stop running the query. + { + auto stream = wrapper.Query( + "WITH RECURSIVE nums AS ( " + "SELECT 1 num " + "UNION " + "SELECT num + 1 from nums WHERE num < 100000000000000) " + "SELECT COUNT(num) FROM nums"); + + // Wait for a bit for the thread to start running. To do something better + // we would need a way to figure out that the thread has started executing + // so we could stop. Unfortunately, this is quite a difficult problem to + // solve and probably not worth doing. + base::SleepMicroseconds(10 * 1000); + } + + // Verify that we are able to run something on the thread pool in a reasonable + // amount of time. + { + auto future = base::RunOnceOnThreadPool(&pool, []() { return 1; }); + ASSERT_EQ(WaitForFutureReady(future, 250), 1); + } +} + } // namespace } // namespace cloud_trace_processor } // namespace perfetto