Skip to content

Commit

Permalink
Merge "ctp: propogate dropping of stream to trace processor in CTP" i…
Browse files Browse the repository at this point in the history
…nto main
  • Loading branch information
Treehugger Robot authored and Gerrit Code Review committed Sep 3, 2023
2 parents 2c366a1 + d8da5ad commit 4201d29
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 9 deletions.
37 changes: 31 additions & 6 deletions include/perfetto/ext/base/threading/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,24 @@ namespace base {

// Blocks the calling thread until |fd| is considered "readable". In Linux,
// this corresponds to |POLLOUT| or |POLLHUP| being returned if |fd| is polled.
inline void BlockUntilReadableFd(base::PlatformHandle fd) {
// If |timeout_ms| is specified, waits that many ms before stopping.
//
// Returns true if the function returned because the fd was readable and false
// otherwise.
inline bool BlockUntilReadableFd(
base::PlatformHandle fd,
std::optional<uint32_t> timeout_ms = std::nullopt) {
bool is_readable = false;
base::UnixTaskRunner runner;
runner.AddFileDescriptorWatch(fd, [&runner]() { runner.Quit(); });
runner.AddFileDescriptorWatch(fd, [&runner, &is_readable]() {
is_readable = true;
runner.Quit();
});
if (timeout_ms) {
runner.PostDelayedTask([&runner]() { runner.Quit(); }, *timeout_ms);
}
runner.Run();
return is_readable;
}

// Creates a Stream<T> which returns all the data from |channel| and completes
Expand Down Expand Up @@ -106,25 +120,35 @@ Future<FVoid> WriteChannelFuture(Channel<T>* channel, T item) {
// repeatedly. The returned stream only completes when |fn| returns
// std::nullopt.
//
// Callers can optionally specify a |on_destroy| function which is executed when
// the returned stream is destroyed. This is useful for informing the work
// spawned on the thread pool that the result is no longer necessary.
//
// The intended usage of this function is to schedule CPU intensive work on a
// background thread pool and receive regular "updates" on the progress by:
// a) breaking the work into chunks
// b) returning some indication of progress/partial results through |T|.
template <typename T>
Stream<T> RunOnThreadPool(ThreadPool* pool,
std::function<std::optional<T>()> fn) {
Stream<T> RunOnThreadPool(
ThreadPool* pool,
std::function<std::optional<T>()> fn,
std::function<void()> on_destroy = [] {}) {
class RunOnPoolImpl : public StreamPollable<T> {
public:
explicit RunOnPoolImpl(ThreadPool* pool,
std::function<std::optional<T>()> fn)
std::function<std::optional<T>()> fn,
std::function<void()> on_destroy)
: pool_(pool),
fn_(std::make_shared<std::function<std::optional<T>()>>(
std::move(fn))),
on_destroy_(std::move(on_destroy)),
channel_(new Channel<T>(1)),
channel_stream_(ReadChannelStream(channel_.get())) {
RunFn();
}

~RunOnPoolImpl() override { on_destroy_(); }

StreamPollResult<T> PollNext(PollContext* ctx) override {
ASSIGN_OR_RETURN_IF_PENDING_STREAM(res, channel_stream_.PollNext(ctx));
if (res.IsDone()) {
Expand Down Expand Up @@ -154,10 +178,11 @@ Stream<T> RunOnThreadPool(ThreadPool* pool,

ThreadPool* pool_ = nullptr;
std::shared_ptr<std::function<std::optional<T>()>> fn_;
std::function<void()> on_destroy_;
std::shared_ptr<Channel<T>> channel_;
base::Stream<T> channel_stream_;
};
return MakeStream<RunOnPoolImpl>(pool, std::move(fn));
return MakeStream<RunOnPoolImpl>(pool, std::move(fn), std::move(on_destroy));
}

// Creates a Future<T> which yields the result of executing |fn| on |pool|. The
Expand Down
3 changes: 2 additions & 1 deletion src/cloud_trace_processor/trace_processor_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ base::StatusOrStream<protos::QueryTraceResponse> TraceProcessorWrapper::Query(
}
return base::RunOnThreadPool<StatusOrResponse>(
thread_pool_,
QueryRunner(trace_processor_, query, trace_path_, statefulness_));
QueryRunner(trace_processor_, query, trace_path_, statefulness_),
[tp = trace_processor_] { tp->InterruptQuery(); });
}

} // namespace cloud_trace_processor
Expand Down
43 changes: 41 additions & 2 deletions src/cloud_trace_processor/trace_processor_wrapper_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@

#include "src/cloud_trace_processor/trace_processor_wrapper.h"
#include <cstdint>
#include <optional>
#include <vector>

#include "perfetto/base/flat_set.h"
#include "perfetto/base/platform_handle.h"
#include "perfetto/base/status.h"
#include "perfetto/base/time.h"
#include "perfetto/ext/base/status_or.h"
#include "perfetto/ext/base/string_utils.h"
#include "perfetto/ext/base/string_view.h"
Expand Down Expand Up @@ -59,19 +61,27 @@ std::vector<base::StatusOr<std::vector<uint8_t>>> SimpleSystraceChunked() {
}

template <typename T>
T WaitForFutureReady(base::Future<T>& future) {
std::optional<T> WaitForFutureReady(base::Future<T>& future,
std::optional<uint32_t> timeout_ms) {
base::FlatSet<base::PlatformHandle> ready;
base::FlatSet<base::PlatformHandle> interested;
base::PollContext ctx(&interested, &ready);
auto res = future.Poll(&ctx);
for (; res.IsPending(); res = future.Poll(&ctx)) {
PERFETTO_CHECK(interested.size() == 1);
base::BlockUntilReadableFd(*interested.begin());
if (!base::BlockUntilReadableFd(*interested.begin(), timeout_ms)) {
return std::nullopt;
}
interested = {};
}
return res.item();
}

template <typename T>
T WaitForFutureReady(base::Future<T>& future) {
return *WaitForFutureReady(future, std::nullopt);
}

template <typename T>
std::optional<T> WaitForStreamReady(base::Stream<T>& stream) {
base::FlatSet<base::PlatformHandle> ready;
Expand Down Expand Up @@ -200,6 +210,35 @@ TEST(TraceProcessorWrapperUnittest, Chunked) {
}
}

TEST(TraceProcessorWrapperUnittest, Interrupt) {
base::ThreadPool pool(1);
TraceProcessorWrapper wrapper("foobar", &pool, SF::kStateful);

// Create a query which will run ~forever. When this stream is dropped we
// should propogate to the TP instance to also stop running the query.
{
auto stream = wrapper.Query(
"WITH RECURSIVE nums AS ( "
"SELECT 1 num "
"UNION "
"SELECT num + 1 from nums WHERE num < 100000000000000) "
"SELECT COUNT(num) FROM nums");

// Wait for a bit for the thread to start running. To do something better
// we would need a way to figure out that the thread has started executing
// so we could stop. Unfortunately, this is quite a difficult problem to
// solve and probably not worth doing.
base::SleepMicroseconds(10 * 1000);
}

// Verify that we are able to run something on the thread pool in a reasonable
// amount of time.
{
auto future = base::RunOnceOnThreadPool<int>(&pool, []() { return 1; });
ASSERT_EQ(WaitForFutureReady(future, 250), 1);
}
}

} // namespace
} // namespace cloud_trace_processor
} // namespace perfetto

0 comments on commit 4201d29

Please sign in to comment.