Skip to content

Commit

Permalink
ctp: propogate dropping of stream to trace processor in CTP
Browse files Browse the repository at this point in the history
This CL fixes the propogation of query cancellation from CTP to TP. When
the stream is dropped, interrupt the trace processor instance. This is
critical for cancelling inefficient queries.

Change-Id: Ib03c1bfa219db3e70b310e1360ef695545f62a6a
Bug: 278208757
  • Loading branch information
LalitMaganti committed Aug 22, 2023
1 parent 567762c commit d8da5ad
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 9 deletions.
37 changes: 31 additions & 6 deletions include/perfetto/ext/base/threading/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,24 @@ namespace base {

// Blocks the calling thread until |fd| is considered "readable". In Linux,
// this corresponds to |POLLOUT| or |POLLHUP| being returned if |fd| is polled.
inline void BlockUntilReadableFd(base::PlatformHandle fd) {
// If |timeout_ms| is specified, waits that many ms before stopping.
//
// Returns true if the function returned because the fd was readable and false
// otherwise.
inline bool BlockUntilReadableFd(
base::PlatformHandle fd,
std::optional<uint32_t> timeout_ms = std::nullopt) {
bool is_readable = false;
base::UnixTaskRunner runner;
runner.AddFileDescriptorWatch(fd, [&runner]() { runner.Quit(); });
runner.AddFileDescriptorWatch(fd, [&runner, &is_readable]() {
is_readable = true;
runner.Quit();
});
if (timeout_ms) {
runner.PostDelayedTask([&runner]() { runner.Quit(); }, *timeout_ms);
}
runner.Run();
return is_readable;
}

// Creates a Stream<T> which returns all the data from |channel| and completes
Expand Down Expand Up @@ -106,25 +120,35 @@ Future<FVoid> WriteChannelFuture(Channel<T>* channel, T item) {
// repeatedly. The returned stream only completes when |fn| returns
// std::nullopt.
//
// Callers can optionally specify a |on_destroy| function which is executed when
// the returned stream is destroyed. This is useful for informing the work
// spawned on the thread pool that the result is no longer necessary.
//
// The intended usage of this function is to schedule CPU intensive work on a
// background thread pool and receive regular "updates" on the progress by:
// a) breaking the work into chunks
// b) returning some indication of progress/partial results through |T|.
template <typename T>
Stream<T> RunOnThreadPool(ThreadPool* pool,
std::function<std::optional<T>()> fn) {
Stream<T> RunOnThreadPool(
ThreadPool* pool,
std::function<std::optional<T>()> fn,
std::function<void()> on_destroy = [] {}) {
class RunOnPoolImpl : public StreamPollable<T> {
public:
explicit RunOnPoolImpl(ThreadPool* pool,
std::function<std::optional<T>()> fn)
std::function<std::optional<T>()> fn,
std::function<void()> on_destroy)
: pool_(pool),
fn_(std::make_shared<std::function<std::optional<T>()>>(
std::move(fn))),
on_destroy_(std::move(on_destroy)),
channel_(new Channel<T>(1)),
channel_stream_(ReadChannelStream(channel_.get())) {
RunFn();
}

~RunOnPoolImpl() override { on_destroy_(); }

StreamPollResult<T> PollNext(PollContext* ctx) override {
ASSIGN_OR_RETURN_IF_PENDING_STREAM(res, channel_stream_.PollNext(ctx));
if (res.IsDone()) {
Expand Down Expand Up @@ -154,10 +178,11 @@ Stream<T> RunOnThreadPool(ThreadPool* pool,

ThreadPool* pool_ = nullptr;
std::shared_ptr<std::function<std::optional<T>()>> fn_;
std::function<void()> on_destroy_;
std::shared_ptr<Channel<T>> channel_;
base::Stream<T> channel_stream_;
};
return MakeStream<RunOnPoolImpl>(pool, std::move(fn));
return MakeStream<RunOnPoolImpl>(pool, std::move(fn), std::move(on_destroy));
}

// Creates a Future<T> which yields the result of executing |fn| on |pool|. The
Expand Down
3 changes: 2 additions & 1 deletion src/cloud_trace_processor/trace_processor_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ TraceProcessorWrapper::Query(const std::string& query) {
}
return base::RunOnThreadPool<StatusOrResponse>(
thread_pool_,
QueryRunner(trace_processor_, query, trace_path_, statefulness_));
QueryRunner(trace_processor_, query, trace_path_, statefulness_),
[tp = trace_processor_] { tp->InterruptQuery(); });
}

} // namespace cloud_trace_processor
Expand Down
43 changes: 41 additions & 2 deletions src/cloud_trace_processor/trace_processor_wrapper_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@

#include "src/cloud_trace_processor/trace_processor_wrapper.h"
#include <cstdint>
#include <optional>
#include <vector>

#include "perfetto/base/flat_set.h"
#include "perfetto/base/platform_handle.h"
#include "perfetto/base/status.h"
#include "perfetto/base/time.h"
#include "perfetto/ext/base/status_or.h"
#include "perfetto/ext/base/string_utils.h"
#include "perfetto/ext/base/string_view.h"
Expand Down Expand Up @@ -59,19 +61,27 @@ std::vector<base::StatusOr<std::vector<uint8_t>>> SimpleSystraceChunked() {
}

template <typename T>
T WaitForFutureReady(base::Future<T>& future) {
std::optional<T> WaitForFutureReady(base::Future<T>& future,
std::optional<uint32_t> timeout_ms) {
base::FlatSet<base::PlatformHandle> ready;
base::FlatSet<base::PlatformHandle> interested;
base::PollContext ctx(&interested, &ready);
auto res = future.Poll(&ctx);
for (; res.IsPending(); res = future.Poll(&ctx)) {
PERFETTO_CHECK(interested.size() == 1);
base::BlockUntilReadableFd(*interested.begin());
if (!base::BlockUntilReadableFd(*interested.begin(), timeout_ms)) {
return std::nullopt;
}
interested = {};
}
return res.item();
}

template <typename T>
T WaitForFutureReady(base::Future<T>& future) {
return *WaitForFutureReady(future, std::nullopt);
}

template <typename T>
std::optional<T> WaitForStreamReady(base::Stream<T>& stream) {
base::FlatSet<base::PlatformHandle> ready;
Expand Down Expand Up @@ -200,6 +210,35 @@ TEST(TraceProcessorWrapperUnittest, Chunked) {
}
}

TEST(TraceProcessorWrapperUnittest, Interrupt) {
base::ThreadPool pool(1);
TraceProcessorWrapper wrapper("foobar", &pool, SF::kStateful);

// Create a query which will run ~forever. When this stream is dropped we
// should propogate to the TP instance to also stop running the query.
{
auto stream = wrapper.Query(
"WITH RECURSIVE nums AS ( "
"SELECT 1 num "
"UNION "
"SELECT num + 1 from nums WHERE num < 100000000000000) "
"SELECT COUNT(num) FROM nums");

// Wait for a bit for the thread to start running. To do something better
// we would need a way to figure out that the thread has started executing
// so we could stop. Unfortunately, this is quite a difficult problem to
// solve and probably not worth doing.
base::SleepMicroseconds(10 * 1000);
}

// Verify that we are able to run something on the thread pool in a reasonable
// amount of time.
{
auto future = base::RunOnceOnThreadPool<int>(&pool, []() { return 1; });
ASSERT_EQ(WaitForFutureReady(future, 250), 1);
}
}

} // namespace
} // namespace cloud_trace_processor
} // namespace perfetto

0 comments on commit d8da5ad

Please sign in to comment.