From 2c366a1fd06a42f0add9e94a15e3d65a370e25ee Mon Sep 17 00:00:00 2001 From: Lalit Maganti Date: Sat, 2 Sep 2023 23:35:55 +0100 Subject: [PATCH] ctp: overhaul worker API to make everything more resilient This CL attempts to make CTP more resilient to workers restarting. Specifically on Google infra, restarts of workers is fully expected every few days and we need to be able to survive these. The only real way to do this is to make all the worker state ephemeral and make the orchestrator the source of truth. We do this by moving to a "reconciliation" (i.e. Kubernetes) syle model. Concretely, the orchestrator holds onto all the state and pushes this out to the workers every 15s. That way, even if a worker restarts, it'll be in sync within some time. This allows significant simplifications of the worker APIs as we no longer need to track create/destroy etc only the presence/absence of stateless traces. While doing this, also strip out all notion of stateful trace processing which will need some serious thoughts on designing this without losing state. In the future, we might even make the manager stateless by moving that state to a database system which can be accessed by all the components. That way, the worker can pull the state rather than the manager pushing it out. That again requires some significant thought and introduces yet another component so we don't do that in this CL. Change-Id: I8aadce37f86fbaddab59a512ceae61b9f7de755f Bug: 278208757 --- Android.bp | 4 - BUILD | 1 - include/perfetto/ext/base/threading/spawn.h | 3 +- .../ext/cloud_trace_processor/orchestrator.h | 2 + .../ext/cloud_trace_processor/worker.h | 38 ++- .../perfetto/cloud_trace_processor/BUILD.gn | 1 - .../cloud_trace_processor/common.proto | 33 --- .../cloud_trace_processor/orchestrator.proto | 28 +-- .../cloud_trace_processor/worker.proto | 92 ++------ .../orchestrator_impl.cc | 218 +++++++++--------- src/cloud_trace_processor/orchestrator_impl.h | 22 +- .../trace_processor_wrapper.cc | 20 +- .../trace_processor_wrapper.h | 4 +- src/cloud_trace_processor/worker_impl.cc | 118 ++++------ src/cloud_trace_processor/worker_impl.h | 31 +-- 15 files changed, 261 insertions(+), 354 deletions(-) delete mode 100644 protos/perfetto/cloud_trace_processor/common.proto diff --git a/Android.bp b/Android.bp index cfad29297c..e239297eff 100644 --- a/Android.bp +++ b/Android.bp @@ -2486,7 +2486,6 @@ cc_test { genrule { name: "perfetto_protos_perfetto_cloud_trace_processor_lite_gen", srcs: [ - "protos/perfetto/cloud_trace_processor/common.proto", "protos/perfetto/cloud_trace_processor/orchestrator.proto", "protos/perfetto/cloud_trace_processor/worker.proto", ], @@ -2495,7 +2494,6 @@ genrule { ], cmd: "mkdir -p $(genDir)/external/perfetto/ && $(location aprotoc) --proto_path=external/perfetto --cpp_out=lite=true:$(genDir)/external/perfetto/ $(in)", out: [ - "external/perfetto/protos/perfetto/cloud_trace_processor/common.pb.cc", "external/perfetto/protos/perfetto/cloud_trace_processor/orchestrator.pb.cc", "external/perfetto/protos/perfetto/cloud_trace_processor/worker.pb.cc", ], @@ -2505,7 +2503,6 @@ genrule { genrule { name: "perfetto_protos_perfetto_cloud_trace_processor_lite_gen_headers", srcs: [ - "protos/perfetto/cloud_trace_processor/common.proto", "protos/perfetto/cloud_trace_processor/orchestrator.proto", "protos/perfetto/cloud_trace_processor/worker.proto", ], @@ -2514,7 +2511,6 @@ genrule { ], cmd: "mkdir -p $(genDir)/external/perfetto/ && $(location aprotoc) --proto_path=external/perfetto --cpp_out=lite=true:$(genDir)/external/perfetto/ $(in)", out: [ - "external/perfetto/protos/perfetto/cloud_trace_processor/common.pb.h", "external/perfetto/protos/perfetto/cloud_trace_processor/orchestrator.pb.h", "external/perfetto/protos/perfetto/cloud_trace_processor/worker.pb.h", ], diff --git a/BUILD b/BUILD index b808efab36..a219cebdfe 100644 --- a/BUILD +++ b/BUILD @@ -3463,7 +3463,6 @@ perfetto_cc_proto_library( perfetto_proto_library( name = "protos_perfetto_cloud_trace_processor_protos", srcs = [ - "protos/perfetto/cloud_trace_processor/common.proto", "protos/perfetto/cloud_trace_processor/orchestrator.proto", "protos/perfetto/cloud_trace_processor/worker.proto", ], diff --git a/include/perfetto/ext/base/threading/spawn.h b/include/perfetto/ext/base/threading/spawn.h index 8e848fbd9c..4f650521dc 100644 --- a/include/perfetto/ext/base/threading/spawn.h +++ b/include/perfetto/ext/base/threading/spawn.h @@ -55,11 +55,12 @@ class PolledFuture; class SpawnHandle { public: SpawnHandle(TaskRunner* task_runner, std::function()> fn); - ~SpawnHandle(); SpawnHandle(SpawnHandle&&) = default; SpawnHandle& operator=(SpawnHandle&&) = default; + ~SpawnHandle(); + private: SpawnHandle(const SpawnHandle&) = delete; SpawnHandle& operator=(const SpawnHandle&) = delete; diff --git a/include/perfetto/ext/cloud_trace_processor/orchestrator.h b/include/perfetto/ext/cloud_trace_processor/orchestrator.h index 7983161853..4501f6ef04 100644 --- a/include/perfetto/ext/cloud_trace_processor/orchestrator.h +++ b/include/perfetto/ext/cloud_trace_processor/orchestrator.h @@ -21,6 +21,7 @@ #include #include "perfetto/base/status.h" +#include "perfetto/base/task_runner.h" #include "perfetto/ext/base/status_or.h" #include "perfetto/ext/base/threading/future.h" #include "perfetto/ext/base/threading/stream.h" @@ -60,6 +61,7 @@ class Orchestrator { // process or even on another machine); the returned manager will gracefully // handle this. static std::unique_ptr CreateInProcess( + base::TaskRunner*, std::vector> workers); // Creates a TracePool with the specified arguments. diff --git a/include/perfetto/ext/cloud_trace_processor/worker.h b/include/perfetto/ext/cloud_trace_processor/worker.h index 60dbe7763a..d96c495940 100644 --- a/include/perfetto/ext/cloud_trace_processor/worker.h +++ b/include/perfetto/ext/cloud_trace_processor/worker.h @@ -20,6 +20,7 @@ #include #include +#include "perfetto/base/task_runner.h" #include "perfetto/ext/base/threading/future.h" #include "perfetto/ext/base/threading/stream.h" @@ -30,17 +31,11 @@ class ThreadPool; } namespace protos { -class TracePoolShardCreateArgs; -class TracePoolShardCreateResponse; +class SyncTraceStateArgs; +class SyncTraceStateResponse; -class TracePoolShardSetTracesArgs; -class TracePoolShardSetTracesResponse; - -class TracePoolShardQueryArgs; -class TracePoolShardQueryResponse; - -class TracePoolShardDestroyArgs; -class TracePoolShardDestroyResponse; +class QueryTraceArgs; +class QueryTraceResponse; } // namespace protos namespace cloud_trace_processor { @@ -58,24 +53,17 @@ class Worker { // |CtpEnvironment| and a |ThreadPool|. The |CtpEnvironment| will be used to // perform any interaction with the OS (e.g. opening and reading files) and // the |ThreadPool| will be used to dispatch requests to TraceProcessor. - static std::unique_ptr CreateInProcesss(CtpEnvironment*, + static std::unique_ptr CreateInProcesss(base::TaskRunner*, + CtpEnvironment*, base::ThreadPool*); - // Creates a TracePoolShard which will be owned by this worker. - virtual base::StatusOrFuture - TracePoolShardCreate(const protos::TracePoolShardCreateArgs&) = 0; - - // Associates the provided list of traces to this TracePoolShard. - virtual base::StatusOrStream - TracePoolShardSetTraces(const protos::TracePoolShardSetTracesArgs&) = 0; - - // Executes a SQL query on the specified TracePoolShard. - virtual base::StatusOrStream - TracePoolShardQuery(const protos::TracePoolShardQueryArgs&) = 0; + // Synchronize the state of the traces in the worker to the orchestrator. + virtual base::StatusOrStream SyncTraceState( + const protos::SyncTraceStateArgs&) = 0; - // Destroys the TracePoolShard with the specified id. - virtual base::StatusOrFuture - TracePoolShardDestroy(const protos::TracePoolShardDestroyArgs&) = 0; + // Executes a SQL query on the specified trace. + virtual base::StatusOrStream QueryTrace( + const protos::QueryTraceArgs&) = 0; }; } // namespace cloud_trace_processor diff --git a/protos/perfetto/cloud_trace_processor/BUILD.gn b/protos/perfetto/cloud_trace_processor/BUILD.gn index 83464341ca..d3178fe5a6 100644 --- a/protos/perfetto/cloud_trace_processor/BUILD.gn +++ b/protos/perfetto/cloud_trace_processor/BUILD.gn @@ -15,7 +15,6 @@ import("../../../gn/proto_library.gni") SOURCES = [ - "common.proto", "orchestrator.proto", "worker.proto", ] diff --git a/protos/perfetto/cloud_trace_processor/common.proto b/protos/perfetto/cloud_trace_processor/common.proto deleted file mode 100644 index f2fd5cb9d2..0000000000 --- a/protos/perfetto/cloud_trace_processor/common.proto +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright (C) 2023 The Android Open Source Project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -syntax = "proto2"; - -package perfetto.protos; - -enum TracePoolType { - TYPE_UNKNOWN = 0; - - // Indicates that the trace pool can be accessed by more than one user. This - // implies the pool is "stateless" (i.e. TraceProcessor instances do not - // retain state between RPCs). - SHARED = 1; - - // Indicates that the trace pool is only accessible by a single user at a - // time. This implies the pool is "stateful" (i.e. TraceProcessor instances - // retain state across RPCs). - DEDICATED = 2; -} diff --git a/protos/perfetto/cloud_trace_processor/orchestrator.proto b/protos/perfetto/cloud_trace_processor/orchestrator.proto index 2f16e23463..3ac84fe996 100644 --- a/protos/perfetto/cloud_trace_processor/orchestrator.proto +++ b/protos/perfetto/cloud_trace_processor/orchestrator.proto @@ -19,7 +19,6 @@ syntax = "proto2"; package perfetto.protos; import "protos/perfetto/trace_processor/trace_processor.proto"; -import "protos/perfetto/cloud_trace_processor/common.proto"; // RPC interface for a CloudTraceProcessor Orchestrator. // @@ -32,20 +31,16 @@ service CloudTraceProcessorOrchestrator { // A TracePool is a logical group of traces which can be addressed with a // single id. // - // Pools can be "shared" or "dedicated": - // a) a shared pool has the trace processor instances backing the pool shared - // among a group of users. This implicitly means that the pools are - // "stateless" (i.e. do not preserve trace processor state between RPCs) as - // the state of one user should not interfere with the state of another. - // b) a dedicated pool belongs to a single user and can only be accessed - // by that user. These pools are "stateful" i.e. preserve trace processor - // state between RPCs. + // Pools are shared: the trace processor instances backing the pool are shared + // among a group of users. This implicitly means that the pools are + // "stateless" (i.e. do not preserve trace processor state between RPCs) as + // the state of one user should not interfere with the state of another. rpc TracePoolCreate(TracePoolCreateArgs) returns (TracePoolCreateResponse); // Changes the set of traces associated with the specified TracePool. // // If this operation completes successfully, any future requests to this pool - // shard will refer to this set of traces. + // will refer to this set of traces. rpc TracePoolSetTraces(TracePoolSetTracesArgs) returns (TracePoolSetTracesResponse); @@ -61,22 +56,19 @@ service CloudTraceProcessorOrchestrator { // Destroys the TracePool with the specified id. // // Any future requests to this pool will return an error. However, the - // same pool id (if a named pool) can be used to create a new pool. + // same pool id can be used to create a new pool. rpc TracePoolDestroy(TracePoolDestroyArgs) returns (TracePoolDestroyResponse); } // Request/Response for Orchestrator::TracePoolCreate. message TracePoolCreateArgs { - optional TracePoolType pool_type = 1; - - // If |pool_type| == SHARED, the name which should be refer to the pool. This - // name will form part of |pool_id|. - optional string shared_pool_name = 2; + // The name which should be refer to the pool. This name will form part of + // |pool_id|. + optional string pool_name = 2; } message TracePoolCreateResponse { // The id of the pool which should be used to reference the pool in all future - // RPCs. For shared pools, this id is expected to be a stable transformation - // of |shared_pool_name|. + // RPCs. This id is expected to be a stable transformation of |pool_name|. optional string pool_id = 1; } diff --git a/protos/perfetto/cloud_trace_processor/worker.proto b/protos/perfetto/cloud_trace_processor/worker.proto index 407ef90d32..9289ab2c71 100644 --- a/protos/perfetto/cloud_trace_processor/worker.proto +++ b/protos/perfetto/cloud_trace_processor/worker.proto @@ -19,87 +19,41 @@ syntax = "proto2"; package perfetto.protos; import "protos/perfetto/trace_processor/trace_processor.proto"; -import "protos/perfetto/cloud_trace_processor/common.proto"; // Interface for a CloudTraceProcessor "Worker". // -// Workers are are owned by a |Orchestrator| who assigns groups of traces to -// them (known as a PoolShards) and forwards any requests from end users. -// Workers are reponsible for loading assigned traces with TraceProcessor and -// executing the requests. +// Workers are owned by an |Orchestrator| which assigns traces and forwards any +// requests from end users. Workers are responsible for loading assigned traces +// with TraceProcessor and executing the requests. service CloudTraceProcessorWorker { - // Creates a TracePoolShard which will be owned by this worker and returns - // whether it was successfully created. - // - // Orchestrators are responsible for handling groups of traces which the user - // has requested to be loaded: these are known as TracePools. The orchestrator - // then breaks these pools into pieces and shards them out to workers, each of - // which is known as a TracePoolShard. - // - // Thus, a TracePoolShard is unique identified by the tuple (worker, pool id). - rpc TracePoolShardCreate(TracePoolShardCreateArgs) - returns (TracePoolShardCreateResponse); - - // Associates the provided list of traces to this TracePoolShard and returns - // a stream with each element indicating the successful load of one trace - // (which allows monitoring the progress of loads) or a terminal error if the - // assignment of any trace failed. - // - // If this operation completes successfully, any future requests to this pool - // shard will refer to this set of traces. - rpc TracePoolShardSetTraces(TracePoolShardSetTracesArgs) - returns (stream TracePoolShardSetTracesResponse); - - // Executes a SQL query on the specified TracePoolShard and returns a stream - // with each element being the response for executing the query on the - // associated trace. - // - // Note that each trace can return >1 result due to chunking of protos at the + // Synchronize the state of the traces in the worker to the orchestrator. The + // orchestrator uses this method to ensure the worker is always fully up to + // date with the state according to the orchestrator. This makes the system + // resilient to worker restarts (i.e. loss of local state). + rpc SyncTraceState(SyncTraceStateArgs) + returns (stream SyncTraceStateResponse); + + // Executes a SQL query on the specified trace and returns a stream of + // execution responses. Note that this method returns a stream because each + // trace can return >1 result due to chunking of protos at the // TraceProcessor::QueryResult level. - rpc TracePoolShardQuery(TracePoolShardQueryArgs) - returns (stream TracePoolShardQueryResponse); - - // Destroys the TracePoolShard with the specified id. - // - // Any future requests to this shard id will return an error. However, the - // same pool id can be used to create a new shard. - rpc TracePoolShardDestroy(TracePoolShardDestroyArgs) - returns (TracePoolShardDestroyResponse); -} - -// Request/Response for Worker::TracePoolShardCreate. -message TracePoolShardCreateArgs { - optional string pool_id = 1; - optional TracePoolType pool_type = 2; + rpc QueryTrace(QueryTraceArgs) returns (stream QueryTraceResponse); } -message TracePoolShardCreateResponse {} -// Request/Response for Worker::TracePoolShardSetTraces. -message TracePoolShardSetTracesArgs { - optional string pool_id = 1; - - // The list of traces which should be associated with this shard. The existing - // loaded trace list will be diffed against this list. Traces not present in - // this list and loaded will be unloaded while traces present in this list - // and unloaded will be loaded. - repeated string traces = 2; +// Request/Response for Worker::Sync. +message SyncTraceStateArgs { + repeated string traces = 1; } -message TracePoolShardSetTracesResponse { - optional string trace = 1; +message SyncTraceStateResponse { + optional double load_progress = 1; } -// Request/Response for Worker::TracePoolShardQuery. -message TracePoolShardQueryArgs { - optional string pool_id = 1; +// Request/Response for Worker::TraceQuery. +message QueryTraceArgs { + optional string trace = 1; optional string sql_query = 2; } -message TracePoolShardQueryResponse { +message QueryTraceResponse { optional string trace = 1; optional QueryResult result = 2; } - -// Request/Response for Worker::TracePoolShardDestroy. -message TracePoolShardDestroyArgs { - optional string pool_id = 1; -} -message TracePoolShardDestroyResponse {} diff --git a/src/cloud_trace_processor/orchestrator_impl.cc b/src/cloud_trace_processor/orchestrator_impl.cc index 69b831fade..bc22e9806f 100644 --- a/src/cloud_trace_processor/orchestrator_impl.cc +++ b/src/cloud_trace_processor/orchestrator_impl.cc @@ -17,17 +17,23 @@ #include "src/cloud_trace_processor/orchestrator_impl.h" #include +#include #include +#include #include #include #include "perfetto/base/status.h" +#include "perfetto/base/task_runner.h" #include "perfetto/ext/base/flat_hash_map.h" +#include "perfetto/ext/base/periodic_task.h" #include "perfetto/ext/base/status_or.h" #include "perfetto/ext/base/threading/future.h" +#include "perfetto/ext/base/threading/poll.h" +#include "perfetto/ext/base/threading/spawn.h" #include "perfetto/ext/base/threading/stream.h" +#include "perfetto/ext/cloud_trace_processor/orchestrator.h" #include "perfetto/ext/cloud_trace_processor/worker.h" -#include "protos/perfetto/cloud_trace_processor/common.pb.h" #include "protos/perfetto/cloud_trace_processor/orchestrator.pb.h" #include "protos/perfetto/cloud_trace_processor/worker.pb.h" #include "src/trace_processor/util/status_macros.h" @@ -36,19 +42,8 @@ namespace perfetto { namespace cloud_trace_processor { namespace { -base::Future CreateResponseToStatus( - base::StatusOr response_or) { - return response_or.status(); -} - -base::Future SetTracesResponseToStatus( - base::StatusOr response_or) { - return response_or.status(); -} - base::Future> -RpcResponseToPoolResponse( - base::StatusOr resp) { +RpcResponseToPoolResponse(base::StatusOr resp) { RETURN_IF_ERROR(resp.status()); protos::TracePoolQueryResponse ret; ret.set_trace(std::move(resp->trace())); @@ -56,77 +51,47 @@ RpcResponseToPoolResponse( return ret; } -base::StatusOrStream -RoundRobinSetTraces(const std::vector>& workers, - const std::vector& traces) { - uint32_t worker_idx = 0; - std::vector protos; - protos.resize(workers.size()); - for (const auto& trace : traces) { - protos[worker_idx].add_traces(trace); - worker_idx = (worker_idx + 1) % workers.size(); - } +// The period of sync of state from the orchestrator to all the workers. This +// constant trades freshness (i.e. lower period) vs unnecessary work (i.e. +// higher period). 15s seems an acceptable number even for interactive trace +// loads. +static constexpr uint32_t kDefaultWorkerSyncPeriod = 15000; - using ShardResponse = protos::TracePoolShardSetTracesResponse; - std::vector> streams; - for (uint32_t i = 0; i < protos.size(); ++i) { - streams.emplace_back(workers[i]->TracePoolShardSetTraces(protos[i])); - } - return base::FlattenStreams(std::move(streams)); -} } // namespace Orchestrator::~Orchestrator() = default; std::unique_ptr Orchestrator::CreateInProcess( + base::TaskRunner* task_runner, std::vector> workers) { return std::unique_ptr( - new OrchestratorImpl(std::move(workers))); + new OrchestratorImpl(task_runner, std::move(workers))); } -OrchestratorImpl::OrchestratorImpl(std::vector> workers) - : workers_(std::move(workers)) {} +OrchestratorImpl::OrchestratorImpl(base::TaskRunner* task_runner, + std::vector> workers) + : task_runner_(task_runner), + periodic_sync_task_(task_runner), + workers_(std::move(workers)) { + base::PeriodicTask::Args args; + args.task = [this] { ExecuteSyncWorkers(); }; + args.period_ms = kDefaultWorkerSyncPeriod; + args.start_first_task_immediately = true; + periodic_sync_task_.Start(std::move(args)); +} base::StatusOrFuture OrchestratorImpl::TracePoolCreate(const protos::TracePoolCreateArgs& args) { - if (args.pool_type() != protos::TracePoolType::SHARED) { - return base::StatusOr( - base::ErrStatus("Currently only SHARED pools are supported")); - } - if (!args.has_shared_pool_name()) { + if (!args.has_pool_name()) { return base::StatusOr( - base::ErrStatus("Pool name must be provided for SHARED pools")); + base::ErrStatus("Pool name must be provided")); } - - std::string id = "shared:" + args.shared_pool_name(); - TracePool* exist = pools_.Find(id); - if (exist) { + std::string id = "stateless:" + args.pool_name(); + if (auto it_inserted = pools_.Insert(id, TracePool()); !it_inserted.second) { return base::StatusOr( - base::ErrStatus("Pool %s already exists", id.c_str())); - } - protos::TracePoolShardCreateArgs group_args; - group_args.set_pool_id(id); - group_args.set_pool_type(args.pool_type()); - - using ShardResponse = protos::TracePoolShardCreateResponse; - std::vector> shards; - for (uint32_t i = 0; i < workers_.size(); ++i) { - shards.emplace_back( - base::StreamFromFuture(workers_[i]->TracePoolShardCreate(group_args))); + base::ErrStatus("Pool '%s' already exists", id.c_str())); } - return base::FlattenStreams(std::move(shards)) - .MapFuture(&CreateResponseToStatus) - .Collect(base::AllOkCollector()) - .ContinueWith( - [this, id](base::StatusOr resp) - -> base::StatusOrFuture { - RETURN_IF_ERROR(resp.status()); - auto it_and_inserted = pools_.Insert(id, TracePool()); - if (!it_and_inserted.second) { - return base::ErrStatus("Unable to insert pool %s", id.c_str()); - } - return protos::TracePoolCreateResponse(); - }); + return protos::TracePoolCreateResponse(); } base::StatusOrFuture @@ -138,20 +103,25 @@ OrchestratorImpl::TracePoolSetTraces( return base::StatusOr( base::ErrStatus("Unable to find pool %s", id.c_str())); } - if (!pool->loaded_traces.empty()) { + if (!pool->traces.empty()) { return base::StatusOr(base::ErrStatus( "Incrementally adding/removing items to pool not currently supported")); } - pool->loaded_traces.assign(args.traces().begin(), args.traces().end()); - return RoundRobinSetTraces(workers_, pool->loaded_traces) - .MapFuture(&SetTracesResponseToStatus) - .Collect(base::AllOkCollector()) - .ContinueWith( - [](base::Status status) - -> base::StatusOrFuture { - RETURN_IF_ERROR(status); - return protos::TracePoolSetTracesResponse(); - }); + pool->traces.assign(args.traces().begin(), args.traces().end()); + + uint32_t round_robin_worker_idx = 0; + for (const std::string& trace_path : pool->traces) { + auto it_and_inserted = traces_.Insert(trace_path, Trace()); + it_and_inserted.first->refcount++; + if (it_and_inserted.second) { + it_and_inserted.first->worker = workers_[round_robin_worker_idx].get(); + // Set the worker index to the next worker in a round-robin fashion. + round_robin_worker_idx = (round_robin_worker_idx + 1) % workers_.size(); + } else { + PERFETTO_CHECK(it_and_inserted.first); + } + } + return protos::TracePoolSetTracesResponse(); } base::StatusOrStream @@ -161,14 +131,14 @@ OrchestratorImpl::TracePoolQuery(const protos::TracePoolQueryArgs& args) { return base::StreamOf(base::StatusOr( base::ErrStatus("Unable to find pool %s", args.pool_id().c_str()))); } - protos::TracePoolShardQueryArgs shard_args; - *shard_args.mutable_pool_id() = args.pool_id(); - *shard_args.mutable_sql_query() = args.sql_query(); - - using ShardResponse = protos::TracePoolShardQueryResponse; - std::vector> streams; - for (uint32_t i = 0; i < workers_.size(); ++i) { - streams.emplace_back(workers_[i]->TracePoolShardQuery(shard_args)); + + std::vector> streams; + protos::QueryTraceArgs query_args; + *query_args.mutable_sql_query() = args.sql_query(); + for (const std::string& trace_path : pool->traces) { + auto* trace = traces_.Find(trace_path); + *query_args.mutable_trace() = trace_path; + streams.emplace_back(trace->worker->QueryTrace(query_args)); } return base::FlattenStreams(std::move(streams)) .MapFuture(&RpcResponseToPoolResponse); @@ -182,28 +152,64 @@ OrchestratorImpl::TracePoolDestroy(const protos::TracePoolDestroyArgs& args) { return base::StatusOr( base::ErrStatus("Unable to find pool %s", id.c_str())); } - protos::TracePoolShardDestroyArgs shard_args; - *shard_args.mutable_pool_id() = id; - - using ShardResponse = protos::TracePoolShardDestroyResponse; - std::vector> streams; - for (uint32_t i = 0; i < workers_.size(); ++i) { - streams.emplace_back( - base::StreamFromFuture(workers_[i]->TracePoolShardDestroy(shard_args))); + std::unordered_set to_erase; + for (auto it = traces_.GetIterator(); it; ++it) { + PERFETTO_CHECK(it.value().refcount-- > 0); + if (it.value().refcount == 0) { + to_erase.emplace(it.key()); + } + } + for (const std::string& trace_path : to_erase) { + traces_.Erase(trace_path); + } + PERFETTO_CHECK(pools_.Erase(id)); + return protos::TracePoolDestroyResponse(); +} + +void OrchestratorImpl::ExecuteSyncWorkers() { + if (periodic_sync_handle_) { + return; + } + periodic_sync_handle_ = base::SpawnFuture(task_runner_, [this]() { + return SyncWorkers().ContinueWith([this](base::Status status) { + if (!status.ok()) { + PERFETTO_ELOG("%s", status.c_message()); + } + periodic_sync_handle_ = std::nullopt; + return base::Future(base::FVoid()); + }); + }); +} + +void OrchestratorImpl::ExecuteForceSyncWorkers() { + // Destroy the sync handle to cancel any currently running sync. + periodic_sync_handle_ = std::nullopt; + ExecuteSyncWorkers(); +} + +base::StatusFuture OrchestratorImpl::SyncWorkers() { + std::vector> streams; + base::FlatHashMap> worker_to_traces; + for (auto it = traces_.GetIterator(); it; ++it) { + auto it_and_inserted = worker_to_traces.Insert(it.value().worker, {}); + it_and_inserted.first->emplace_back(it.key()); + } + for (auto& worker : workers_) { + auto* traces = worker_to_traces.Find(worker.get()); + if (!traces) { + continue; + } + protos::SyncTraceStateArgs args; + for (const auto& trace : *traces) { + args.add_traces(trace); + } + streams.push_back(worker->SyncTraceState(std::move(args))); } return base::FlattenStreams(std::move(streams)) - .MapFuture( - [](base::StatusOr resp) -> base::Future { - return resp.status(); - }) - .Collect(base::AllOkCollector()) - .ContinueWith( - [this, id](base::Status status) - -> base::StatusOrFuture { - RETURN_IF_ERROR(status); - PERFETTO_CHECK(pools_.Erase(id)); - return protos::TracePoolDestroyResponse(); - }); + .MapFuture([](base::StatusOr resp) { + return base::StatusFuture(resp.status()); + }) + .Collect(base::AllOkCollector()); } } // namespace cloud_trace_processor diff --git a/src/cloud_trace_processor/orchestrator_impl.h b/src/cloud_trace_processor/orchestrator_impl.h index eef55e5746..39b9f0295d 100644 --- a/src/cloud_trace_processor/orchestrator_impl.h +++ b/src/cloud_trace_processor/orchestrator_impl.h @@ -18,10 +18,14 @@ #define SRC_CLOUD_TRACE_PROCESSOR_ORCHESTRATOR_IMPL_H_ #include +#include #include +#include "perfetto/base/task_runner.h" #include "perfetto/ext/base/flat_hash_map.h" +#include "perfetto/ext/base/periodic_task.h" #include "perfetto/ext/base/threading/future.h" +#include "perfetto/ext/base/threading/spawn.h" #include "perfetto/ext/cloud_trace_processor/orchestrator.h" namespace perfetto { @@ -33,7 +37,8 @@ namespace cloud_trace_processor { class OrchestratorImpl : public Orchestrator { public: - explicit OrchestratorImpl(std::vector> workers); + explicit OrchestratorImpl(base::TaskRunner*, + std::vector>); base::StatusOrStream TracePoolQuery( const protos::TracePoolQueryArgs&) override; @@ -49,10 +54,23 @@ class OrchestratorImpl : public Orchestrator { private: struct TracePool { - std::vector loaded_traces; + std::vector traces; }; + struct Trace { + Worker* worker = nullptr; + uint32_t refcount = 0; + }; + void ExecuteSyncWorkers(); + void ExecuteForceSyncWorkers(); + base::StatusFuture SyncWorkers(); + + base::TaskRunner* task_runner_ = nullptr; + base::PeriodicTask periodic_sync_task_; + std::optional periodic_sync_handle_; + std::vector> workers_; base::FlatHashMap pools_; + base::FlatHashMap traces_; }; } // namespace cloud_trace_processor diff --git a/src/cloud_trace_processor/trace_processor_wrapper.cc b/src/cloud_trace_processor/trace_processor_wrapper.cc index 5093e5285d..6b51312f6e 100644 --- a/src/cloud_trace_processor/trace_processor_wrapper.cc +++ b/src/cloud_trace_processor/trace_processor_wrapper.cc @@ -62,7 +62,7 @@ struct QueryRunner { trace_path(std::move(_trace_path)), statefulness(_statefulness) {} - std::optional operator()() { + std::optional operator()() { if (!has_more) { if (statefulness == Statefulness::kStateless) { tp->RestoreInitialTables(); @@ -74,7 +74,7 @@ struct QueryRunner { EnsureSerializerExists(); has_more = serializer->Serialize(&result); - protos::TracePoolShardQueryResponse resp; + protos::QueryTraceResponse resp; *resp.mutable_trace() = trace_path; resp.mutable_result()->ParseFromArray(result.data(), static_cast(result.size())); @@ -121,28 +121,30 @@ base::StatusFuture TraceProcessorWrapper::LoadTrace( } return std::move(file_stream) .MapFuture( - [this](base::StatusOr> d) -> base::StatusFuture { + [thread_pool = thread_pool_, tp = trace_processor_]( + base::StatusOr> d) -> base::StatusFuture { RETURN_IF_ERROR(d.status()); return base::RunOnceOnThreadPool( - thread_pool_, [res = std::move(*d), tp = trace_processor_] { + thread_pool, [res = std::move(*d), tp = std::move(tp)] { return tp->Parse(TraceBlobView( TraceBlob::CopyFrom(res.data(), res.size()))); }); }) .Collect(base::AllOkCollector()) - .ContinueWith([this](base::Status status) -> base::StatusFuture { + .ContinueWith([thread_pool = thread_pool_, tp = trace_processor_]( + base::Status status) -> base::StatusFuture { RETURN_IF_ERROR(status); return base::RunOnceOnThreadPool( - thread_pool_, [tp = trace_processor_] { + thread_pool, [tp = std::move(tp)] { tp->NotifyEndOfFile(); return base::OkStatus(); }); }); } -base::StatusOrStream -TraceProcessorWrapper::Query(const std::string& query) { - using StatusOrResponse = base::StatusOr; +base::StatusOrStream TraceProcessorWrapper::Query( + const std::string& query) { + using StatusOrResponse = base::StatusOr; if (trace_processor_.use_count() != 1) { return base::StreamOf( base::ErrStatus("Request is already in flight")); diff --git a/src/cloud_trace_processor/trace_processor_wrapper.h b/src/cloud_trace_processor/trace_processor_wrapper.h index 9916b72255..ac64a5cb3d 100644 --- a/src/cloud_trace_processor/trace_processor_wrapper.h +++ b/src/cloud_trace_processor/trace_processor_wrapper.h @@ -26,7 +26,7 @@ namespace perfetto { namespace protos { -class TracePoolShardQueryResponse; +class QueryTraceResponse; } // namespace protos } // namespace perfetto @@ -58,7 +58,7 @@ class TraceProcessorWrapper { // Executes the given query on the trace processor and returns the results // as a stream. - base::StatusOrStream Query( + base::StatusOrStream Query( const std::string& sql); private: diff --git a/src/cloud_trace_processor/worker_impl.cc b/src/cloud_trace_processor/worker_impl.cc index 6f115601d1..b351034ef9 100644 --- a/src/cloud_trace_processor/worker_impl.cc +++ b/src/cloud_trace_processor/worker_impl.cc @@ -18,11 +18,16 @@ #include +#include "perfetto/base/logging.h" #include "perfetto/base/status.h" +#include "perfetto/base/task_runner.h" #include "perfetto/ext/base/status_or.h" +#include "perfetto/ext/base/threading/future.h" +#include "perfetto/ext/base/threading/spawn.h" #include "perfetto/ext/base/threading/stream.h" +#include "perfetto/ext/base/threading/util.h" #include "perfetto/ext/base/uuid.h" -#include "protos/perfetto/cloud_trace_processor/common.pb.h" +#include "perfetto/ext/cloud_trace_processor/worker.h" #include "protos/perfetto/cloud_trace_processor/orchestrator.pb.h" #include "protos/perfetto/cloud_trace_processor/worker.pb.h" #include "src/cloud_trace_processor/trace_processor_wrapper.h" @@ -33,85 +38,60 @@ namespace cloud_trace_processor { Worker::~Worker() = default; -std::unique_ptr Worker::CreateInProcesss(CtpEnvironment* environment, +std::unique_ptr Worker::CreateInProcesss(base::TaskRunner* runner, + CtpEnvironment* environment, base::ThreadPool* pool) { - return std::make_unique(environment, pool); + return std::make_unique(runner, environment, pool); } -WorkerImpl::WorkerImpl(CtpEnvironment* environment, base::ThreadPool* pool) - : environment_(environment), thread_pool_(pool) {} +WorkerImpl::WorkerImpl(base::TaskRunner* runner, + CtpEnvironment* environment, + base::ThreadPool* pool) + : task_runner_(runner), environment_(environment), thread_pool_(pool) {} -base::StatusOrFuture -WorkerImpl::TracePoolShardCreate(const protos::TracePoolShardCreateArgs& args) { - if (args.pool_type() == protos::TracePoolType::DEDICATED) { - return base::ErrStatus("Dedicated pools are not currently supported"); - } - auto it_and_inserted = shards_.Insert(args.pool_id(), TracePoolShard()); - if (!it_and_inserted.second) { - return base::ErrStatus("Shard for pool %s already exists", - args.pool_id().c_str()); - } - return base::StatusOr(protos::TracePoolShardCreateResponse()); -} - -base::StatusOrStream -WorkerImpl::TracePoolShardSetTraces( - const protos::TracePoolShardSetTracesArgs& args) { - using Response = protos::TracePoolShardSetTracesResponse; - using StatusOrResponse = base::StatusOr; - - TracePoolShard* shard = shards_.Find(args.pool_id()); - if (!shard) { - return base::StreamOf(base::ErrStatus( - "Unable to find shard for pool %s", args.pool_id().c_str())); - } - - std::vector> streams; +base::StatusOrStream WorkerImpl::SyncTraceState( + const protos::SyncTraceStateArgs& args) { + base::FlatHashMap new_traces; + std::vector streams; for (const std::string& trace : args.traces()) { - // TODO(lalitm): add support for stateful trace processor in dedicated - // pools. + if (auto* ptr = traces_.Find(trace); ptr) { + auto it_and_inserted = new_traces.Insert(trace, std::move(*ptr)); + PERFETTO_CHECK(it_and_inserted.second); + continue; + } + auto [handle, stream] = + base::SpawnResultFuture(task_runner_, [this, trace] { + auto t = traces_.Find(trace); + if (!t) { + return base::StatusFuture( + base::ErrStatus("%s: trace not found", trace.c_str())); + } + return t->wrapper->LoadTrace(environment_->ReadFile(trace)); + }); auto tp = std::make_unique( trace, thread_pool_, TraceProcessorWrapper::Statefulness::kStateless); - auto load_trace_future = - tp->LoadTrace(environment_->ReadFile(trace)) - .ContinueWith( - [trace](base::Status status) -> base::Future { - RETURN_IF_ERROR(status); - protos::TracePoolShardSetTracesResponse resp; - *resp.mutable_trace() = trace; - return resp; - }); - streams.emplace_back(base::StreamFromFuture(std::move(load_trace_future))); - shard->tps.emplace_back(std::move(tp)); - } - return base::FlattenStreams(std::move(streams)); -} - -base::StatusOrStream -WorkerImpl::TracePoolShardQuery(const protos::TracePoolShardQueryArgs& args) { - using Response = protos::TracePoolShardQueryResponse; - using StatusOrResponse = base::StatusOr; - TracePoolShard* shard = shards_.Find(args.pool_id()); - if (!shard) { - return base::StreamOf(base::ErrStatus( - "Unable to find shard for pool %s", args.pool_id().c_str())); - } - std::vector> streams; - streams.reserve(shard->tps.size()); - for (std::unique_ptr& tp : shard->tps) { - streams.emplace_back(tp->Query(args.sql_query())); + streams.emplace_back(base::StreamFromFuture(std::move(stream))); + new_traces.Insert(trace, Trace{std::move(tp), std::move(handle)}); } - return base::FlattenStreams(std::move(streams)); + traces_ = std::move(new_traces); + return base::FlattenStreams(std::move(streams)) + .MapFuture([](base::Status status) { + if (!status.ok()) { + return base::StatusOrFuture(status); + } + return base::StatusOrFuture( + protos::SyncTraceStateResponse()); + }); } -base::StatusOrFuture -WorkerImpl::TracePoolShardDestroy( - const protos::TracePoolShardDestroyArgs& args) { - if (!shards_.Erase(args.pool_id())) { - return base::ErrStatus("Unable to find shard for pool %s", - args.pool_id().c_str()); +base::StatusOrStream WorkerImpl::QueryTrace( + const protos::QueryTraceArgs& args) { + auto* tp = traces_.Find(args.trace()); + if (!tp) { + return base::StreamOf>( + base::ErrStatus("%s: trace not found", args.trace().c_str())); } - return base::StatusOr(protos::TracePoolShardDestroyResponse()); + return tp->wrapper->Query(args.sql_query()); } } // namespace cloud_trace_processor diff --git a/src/cloud_trace_processor/worker_impl.h b/src/cloud_trace_processor/worker_impl.h index c7dc7548d3..224fc3bbbb 100644 --- a/src/cloud_trace_processor/worker_impl.h +++ b/src/cloud_trace_processor/worker_impl.h @@ -17,9 +17,14 @@ #ifndef SRC_CLOUD_TRACE_PROCESSOR_WORKER_IMPL_H_ #define SRC_CLOUD_TRACE_PROCESSOR_WORKER_IMPL_H_ +#include +#include +#include #include +#include "perfetto/base/task_runner.h" #include "perfetto/ext/base/flat_hash_map.h" +#include "perfetto/ext/base/threading/spawn.h" #include "perfetto/ext/base/threading/thread_pool.h" #include "perfetto/ext/cloud_trace_processor/environment.h" #include "perfetto/ext/cloud_trace_processor/worker.h" @@ -38,27 +43,25 @@ namespace cloud_trace_processor { class WorkerImpl : public Worker { public: - explicit WorkerImpl(CtpEnvironment*, base::ThreadPool*); + explicit WorkerImpl(base::TaskRunner*, CtpEnvironment*, base::ThreadPool*); - base::StatusOrFuture - TracePoolShardCreate(const protos::TracePoolShardCreateArgs&) override; + // Synchronize the state of the traces in the worker to the orchestrator. + base::StatusOrStream SyncTraceState( + const protos::SyncTraceStateArgs&) override; - base::StatusOrStream - TracePoolShardSetTraces(const protos::TracePoolShardSetTracesArgs&) override; - - base::StatusOrStream TracePoolShardQuery( - const protos::TracePoolShardQueryArgs&) override; - - base::StatusOrFuture - TracePoolShardDestroy(const protos::TracePoolShardDestroyArgs&) override; + // Executes a SQL query on the specified trace. + base::StatusOrStream QueryTrace( + const protos::QueryTraceArgs&) override; private: - struct TracePoolShard { - std::vector> tps; + struct Trace { + std::unique_ptr wrapper; + base::SpawnHandle load_handle; }; + base::TaskRunner* const task_runner_; CtpEnvironment* const environment_; base::ThreadPool* const thread_pool_; - base::FlatHashMap shards_; + base::FlatHashMap traces_; }; } // namespace cloud_trace_processor