From d8da5add90b88c5726ebf82b72ab57d90006d751 Mon Sep 17 00:00:00 2001 From: Lalit Maganti Date: Tue, 22 Aug 2023 23:29:17 +0100 Subject: [PATCH] ctp: propogate dropping of stream to trace processor in CTP This CL fixes the propogation of query cancellation from CTP to TP. When the stream is dropped, interrupt the trace processor instance. This is critical for cancelling inefficient queries. Change-Id: Ib03c1bfa219db3e70b310e1360ef695545f62a6a Bug: 278208757 --- include/perfetto/ext/base/threading/util.h | 37 +++++++++++++--- .../trace_processor_wrapper.cc | 3 +- .../trace_processor_wrapper_unittest.cc | 43 ++++++++++++++++++- 3 files changed, 74 insertions(+), 9 deletions(-) diff --git a/include/perfetto/ext/base/threading/util.h b/include/perfetto/ext/base/threading/util.h index a08a4b0383..2a2b83eb81 100644 --- a/include/perfetto/ext/base/threading/util.h +++ b/include/perfetto/ext/base/threading/util.h @@ -38,10 +38,24 @@ namespace base { // Blocks the calling thread until |fd| is considered "readable". In Linux, // this corresponds to |POLLOUT| or |POLLHUP| being returned if |fd| is polled. -inline void BlockUntilReadableFd(base::PlatformHandle fd) { +// If |timeout_ms| is specified, waits that many ms before stopping. +// +// Returns true if the function returned because the fd was readable and false +// otherwise. +inline bool BlockUntilReadableFd( + base::PlatformHandle fd, + std::optional timeout_ms = std::nullopt) { + bool is_readable = false; base::UnixTaskRunner runner; - runner.AddFileDescriptorWatch(fd, [&runner]() { runner.Quit(); }); + runner.AddFileDescriptorWatch(fd, [&runner, &is_readable]() { + is_readable = true; + runner.Quit(); + }); + if (timeout_ms) { + runner.PostDelayedTask([&runner]() { runner.Quit(); }, *timeout_ms); + } runner.Run(); + return is_readable; } // Creates a Stream which returns all the data from |channel| and completes @@ -106,25 +120,35 @@ Future WriteChannelFuture(Channel* channel, T item) { // repeatedly. The returned stream only completes when |fn| returns // std::nullopt. // +// Callers can optionally specify a |on_destroy| function which is executed when +// the returned stream is destroyed. This is useful for informing the work +// spawned on the thread pool that the result is no longer necessary. +// // The intended usage of this function is to schedule CPU intensive work on a // background thread pool and receive regular "updates" on the progress by: // a) breaking the work into chunks // b) returning some indication of progress/partial results through |T|. template -Stream RunOnThreadPool(ThreadPool* pool, - std::function()> fn) { +Stream RunOnThreadPool( + ThreadPool* pool, + std::function()> fn, + std::function on_destroy = [] {}) { class RunOnPoolImpl : public StreamPollable { public: explicit RunOnPoolImpl(ThreadPool* pool, - std::function()> fn) + std::function()> fn, + std::function on_destroy) : pool_(pool), fn_(std::make_shared()>>( std::move(fn))), + on_destroy_(std::move(on_destroy)), channel_(new Channel(1)), channel_stream_(ReadChannelStream(channel_.get())) { RunFn(); } + ~RunOnPoolImpl() override { on_destroy_(); } + StreamPollResult PollNext(PollContext* ctx) override { ASSIGN_OR_RETURN_IF_PENDING_STREAM(res, channel_stream_.PollNext(ctx)); if (res.IsDone()) { @@ -154,10 +178,11 @@ Stream RunOnThreadPool(ThreadPool* pool, ThreadPool* pool_ = nullptr; std::shared_ptr()>> fn_; + std::function on_destroy_; std::shared_ptr> channel_; base::Stream channel_stream_; }; - return MakeStream(pool, std::move(fn)); + return MakeStream(pool, std::move(fn), std::move(on_destroy)); } // Creates a Future which yields the result of executing |fn| on |pool|. The diff --git a/src/cloud_trace_processor/trace_processor_wrapper.cc b/src/cloud_trace_processor/trace_processor_wrapper.cc index 5093e5285d..a74dc933aa 100644 --- a/src/cloud_trace_processor/trace_processor_wrapper.cc +++ b/src/cloud_trace_processor/trace_processor_wrapper.cc @@ -149,7 +149,8 @@ TraceProcessorWrapper::Query(const std::string& query) { } return base::RunOnThreadPool( thread_pool_, - QueryRunner(trace_processor_, query, trace_path_, statefulness_)); + QueryRunner(trace_processor_, query, trace_path_, statefulness_), + [tp = trace_processor_] { tp->InterruptQuery(); }); } } // namespace cloud_trace_processor diff --git a/src/cloud_trace_processor/trace_processor_wrapper_unittest.cc b/src/cloud_trace_processor/trace_processor_wrapper_unittest.cc index 8739783db3..3baa3db74c 100644 --- a/src/cloud_trace_processor/trace_processor_wrapper_unittest.cc +++ b/src/cloud_trace_processor/trace_processor_wrapper_unittest.cc @@ -16,11 +16,13 @@ #include "src/cloud_trace_processor/trace_processor_wrapper.h" #include +#include #include #include "perfetto/base/flat_set.h" #include "perfetto/base/platform_handle.h" #include "perfetto/base/status.h" +#include "perfetto/base/time.h" #include "perfetto/ext/base/status_or.h" #include "perfetto/ext/base/string_utils.h" #include "perfetto/ext/base/string_view.h" @@ -59,19 +61,27 @@ std::vector>> SimpleSystraceChunked() { } template -T WaitForFutureReady(base::Future& future) { +std::optional WaitForFutureReady(base::Future& future, + std::optional timeout_ms) { base::FlatSet ready; base::FlatSet interested; base::PollContext ctx(&interested, &ready); auto res = future.Poll(&ctx); for (; res.IsPending(); res = future.Poll(&ctx)) { PERFETTO_CHECK(interested.size() == 1); - base::BlockUntilReadableFd(*interested.begin()); + if (!base::BlockUntilReadableFd(*interested.begin(), timeout_ms)) { + return std::nullopt; + } interested = {}; } return res.item(); } +template +T WaitForFutureReady(base::Future& future) { + return *WaitForFutureReady(future, std::nullopt); +} + template std::optional WaitForStreamReady(base::Stream& stream) { base::FlatSet ready; @@ -200,6 +210,35 @@ TEST(TraceProcessorWrapperUnittest, Chunked) { } } +TEST(TraceProcessorWrapperUnittest, Interrupt) { + base::ThreadPool pool(1); + TraceProcessorWrapper wrapper("foobar", &pool, SF::kStateful); + + // Create a query which will run ~forever. When this stream is dropped we + // should propogate to the TP instance to also stop running the query. + { + auto stream = wrapper.Query( + "WITH RECURSIVE nums AS ( " + "SELECT 1 num " + "UNION " + "SELECT num + 1 from nums WHERE num < 100000000000000) " + "SELECT COUNT(num) FROM nums"); + + // Wait for a bit for the thread to start running. To do something better + // we would need a way to figure out that the thread has started executing + // so we could stop. Unfortunately, this is quite a difficult problem to + // solve and probably not worth doing. + base::SleepMicroseconds(10 * 1000); + } + + // Verify that we are able to run something on the thread pool in a reasonable + // amount of time. + { + auto future = base::RunOnceOnThreadPool(&pool, []() { return 1; }); + ASSERT_EQ(WaitForFutureReady(future, 250), 1); + } +} + } // namespace } // namespace cloud_trace_processor } // namespace perfetto