Skip to content

Commit

Permalink
ctp: overhaul worker API to make everything more resilient
Browse files Browse the repository at this point in the history
This CL attempts to make CTP more resilient to workers restarting.
Specifically on Google infra, restarts of workers is fully expected
every few days and we need to be able to survive these.

The only real way to do this is to make all the worker state
ephemeral and make the orchestrator the source of truth. We do this
by moving to a "reconciliation" (i.e. Kubernetes) syle model. Concretely, the orchestrator holds onto all the state and pushes
this out to the workers every 15s. That way, even if a worker
restarts, it'll be in sync within some time.

This allows significant simplifications of the worker APIs as we no
longer need to track create/destroy etc only the presence/absence of
stateless traces. While doing this, also strip out all notion of
stateful trace processing which will need some serious thoughts on
designing this without losing state.

In the future, we might even make the manager stateless by moving that
state to a database system which can be accessed by all the components.
That way, the worker can pull the state rather than the manager pushing
it out. That again requires some significant thought and introduces yet
another component so we don't do that in this CL.

Change-Id: I8aadce37f86fbaddab59a512ceae61b9f7de755f
Bug: 278208757
  • Loading branch information
LalitMaganti committed Sep 2, 2023
1 parent ab09049 commit 2c366a1
Show file tree
Hide file tree
Showing 15 changed files with 261 additions and 354 deletions.
4 changes: 0 additions & 4 deletions Android.bp
Original file line number Diff line number Diff line change
Expand Up @@ -2486,7 +2486,6 @@ cc_test {
genrule {
name: "perfetto_protos_perfetto_cloud_trace_processor_lite_gen",
srcs: [
"protos/perfetto/cloud_trace_processor/common.proto",
"protos/perfetto/cloud_trace_processor/orchestrator.proto",
"protos/perfetto/cloud_trace_processor/worker.proto",
],
Expand All @@ -2495,7 +2494,6 @@ genrule {
],
cmd: "mkdir -p $(genDir)/external/perfetto/ && $(location aprotoc) --proto_path=external/perfetto --cpp_out=lite=true:$(genDir)/external/perfetto/ $(in)",
out: [
"external/perfetto/protos/perfetto/cloud_trace_processor/common.pb.cc",
"external/perfetto/protos/perfetto/cloud_trace_processor/orchestrator.pb.cc",
"external/perfetto/protos/perfetto/cloud_trace_processor/worker.pb.cc",
],
Expand All @@ -2505,7 +2503,6 @@ genrule {
genrule {
name: "perfetto_protos_perfetto_cloud_trace_processor_lite_gen_headers",
srcs: [
"protos/perfetto/cloud_trace_processor/common.proto",
"protos/perfetto/cloud_trace_processor/orchestrator.proto",
"protos/perfetto/cloud_trace_processor/worker.proto",
],
Expand All @@ -2514,7 +2511,6 @@ genrule {
],
cmd: "mkdir -p $(genDir)/external/perfetto/ && $(location aprotoc) --proto_path=external/perfetto --cpp_out=lite=true:$(genDir)/external/perfetto/ $(in)",
out: [
"external/perfetto/protos/perfetto/cloud_trace_processor/common.pb.h",
"external/perfetto/protos/perfetto/cloud_trace_processor/orchestrator.pb.h",
"external/perfetto/protos/perfetto/cloud_trace_processor/worker.pb.h",
],
Expand Down
1 change: 0 additions & 1 deletion BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -3463,7 +3463,6 @@ perfetto_cc_proto_library(
perfetto_proto_library(
name = "protos_perfetto_cloud_trace_processor_protos",
srcs = [
"protos/perfetto/cloud_trace_processor/common.proto",
"protos/perfetto/cloud_trace_processor/orchestrator.proto",
"protos/perfetto/cloud_trace_processor/worker.proto",
],
Expand Down
3 changes: 2 additions & 1 deletion include/perfetto/ext/base/threading/spawn.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@ class PolledFuture;
class SpawnHandle {
public:
SpawnHandle(TaskRunner* task_runner, std::function<Future<FVoid>()> fn);
~SpawnHandle();

SpawnHandle(SpawnHandle&&) = default;
SpawnHandle& operator=(SpawnHandle&&) = default;

~SpawnHandle();

private:
SpawnHandle(const SpawnHandle&) = delete;
SpawnHandle& operator=(const SpawnHandle&) = delete;
Expand Down
2 changes: 2 additions & 0 deletions include/perfetto/ext/cloud_trace_processor/orchestrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <vector>

#include "perfetto/base/status.h"
#include "perfetto/base/task_runner.h"
#include "perfetto/ext/base/status_or.h"
#include "perfetto/ext/base/threading/future.h"
#include "perfetto/ext/base/threading/stream.h"
Expand Down Expand Up @@ -60,6 +61,7 @@ class Orchestrator {
// process or even on another machine); the returned manager will gracefully
// handle this.
static std::unique_ptr<Orchestrator> CreateInProcess(
base::TaskRunner*,
std::vector<std::unique_ptr<Worker>> workers);

// Creates a TracePool with the specified arguments.
Expand Down
38 changes: 13 additions & 25 deletions include/perfetto/ext/cloud_trace_processor/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <memory>
#include <vector>

#include "perfetto/base/task_runner.h"
#include "perfetto/ext/base/threading/future.h"
#include "perfetto/ext/base/threading/stream.h"

Expand All @@ -30,17 +31,11 @@ class ThreadPool;
}

namespace protos {
class TracePoolShardCreateArgs;
class TracePoolShardCreateResponse;
class SyncTraceStateArgs;
class SyncTraceStateResponse;

class TracePoolShardSetTracesArgs;
class TracePoolShardSetTracesResponse;

class TracePoolShardQueryArgs;
class TracePoolShardQueryResponse;

class TracePoolShardDestroyArgs;
class TracePoolShardDestroyResponse;
class QueryTraceArgs;
class QueryTraceResponse;
} // namespace protos

namespace cloud_trace_processor {
Expand All @@ -58,24 +53,17 @@ class Worker {
// |CtpEnvironment| and a |ThreadPool|. The |CtpEnvironment| will be used to
// perform any interaction with the OS (e.g. opening and reading files) and
// the |ThreadPool| will be used to dispatch requests to TraceProcessor.
static std::unique_ptr<Worker> CreateInProcesss(CtpEnvironment*,
static std::unique_ptr<Worker> CreateInProcesss(base::TaskRunner*,
CtpEnvironment*,
base::ThreadPool*);

// Creates a TracePoolShard which will be owned by this worker.
virtual base::StatusOrFuture<protos::TracePoolShardCreateResponse>
TracePoolShardCreate(const protos::TracePoolShardCreateArgs&) = 0;

// Associates the provided list of traces to this TracePoolShard.
virtual base::StatusOrStream<protos::TracePoolShardSetTracesResponse>
TracePoolShardSetTraces(const protos::TracePoolShardSetTracesArgs&) = 0;

// Executes a SQL query on the specified TracePoolShard.
virtual base::StatusOrStream<protos::TracePoolShardQueryResponse>
TracePoolShardQuery(const protos::TracePoolShardQueryArgs&) = 0;
// Synchronize the state of the traces in the worker to the orchestrator.
virtual base::StatusOrStream<protos::SyncTraceStateResponse> SyncTraceState(
const protos::SyncTraceStateArgs&) = 0;

// Destroys the TracePoolShard with the specified id.
virtual base::StatusOrFuture<protos::TracePoolShardDestroyResponse>
TracePoolShardDestroy(const protos::TracePoolShardDestroyArgs&) = 0;
// Executes a SQL query on the specified trace.
virtual base::StatusOrStream<protos::QueryTraceResponse> QueryTrace(
const protos::QueryTraceArgs&) = 0;
};

} // namespace cloud_trace_processor
Expand Down
1 change: 0 additions & 1 deletion protos/perfetto/cloud_trace_processor/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import("../../../gn/proto_library.gni")

SOURCES = [
"common.proto",
"orchestrator.proto",
"worker.proto",
]
Expand Down
33 changes: 0 additions & 33 deletions protos/perfetto/cloud_trace_processor/common.proto

This file was deleted.

28 changes: 10 additions & 18 deletions protos/perfetto/cloud_trace_processor/orchestrator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ syntax = "proto2";
package perfetto.protos;

import "protos/perfetto/trace_processor/trace_processor.proto";
import "protos/perfetto/cloud_trace_processor/common.proto";

// RPC interface for a CloudTraceProcessor Orchestrator.
//
Expand All @@ -32,20 +31,16 @@ service CloudTraceProcessorOrchestrator {
// A TracePool is a logical group of traces which can be addressed with a
// single id.
//
// Pools can be "shared" or "dedicated":
// a) a shared pool has the trace processor instances backing the pool shared
// among a group of users. This implicitly means that the pools are
// "stateless" (i.e. do not preserve trace processor state between RPCs) as
// the state of one user should not interfere with the state of another.
// b) a dedicated pool belongs to a single user and can only be accessed
// by that user. These pools are "stateful" i.e. preserve trace processor
// state between RPCs.
// Pools are shared: the trace processor instances backing the pool are shared
// among a group of users. This implicitly means that the pools are
// "stateless" (i.e. do not preserve trace processor state between RPCs) as
// the state of one user should not interfere with the state of another.
rpc TracePoolCreate(TracePoolCreateArgs) returns (TracePoolCreateResponse);

// Changes the set of traces associated with the specified TracePool.
//
// If this operation completes successfully, any future requests to this pool
// shard will refer to this set of traces.
// will refer to this set of traces.
rpc TracePoolSetTraces(TracePoolSetTracesArgs)
returns (TracePoolSetTracesResponse);

Expand All @@ -61,22 +56,19 @@ service CloudTraceProcessorOrchestrator {
// Destroys the TracePool with the specified id.
//
// Any future requests to this pool will return an error. However, the
// same pool id (if a named pool) can be used to create a new pool.
// same pool id can be used to create a new pool.
rpc TracePoolDestroy(TracePoolDestroyArgs) returns (TracePoolDestroyResponse);
}

// Request/Response for Orchestrator::TracePoolCreate.
message TracePoolCreateArgs {
optional TracePoolType pool_type = 1;

// If |pool_type| == SHARED, the name which should be refer to the pool. This
// name will form part of |pool_id|.
optional string shared_pool_name = 2;
// The name which should be refer to the pool. This name will form part of
// |pool_id|.
optional string pool_name = 2;
}
message TracePoolCreateResponse {
// The id of the pool which should be used to reference the pool in all future
// RPCs. For shared pools, this id is expected to be a stable transformation
// of |shared_pool_name|.
// RPCs. This id is expected to be a stable transformation of |pool_name|.
optional string pool_id = 1;
}

Expand Down
92 changes: 23 additions & 69 deletions protos/perfetto/cloud_trace_processor/worker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,87 +19,41 @@ syntax = "proto2";
package perfetto.protos;

import "protos/perfetto/trace_processor/trace_processor.proto";
import "protos/perfetto/cloud_trace_processor/common.proto";

// Interface for a CloudTraceProcessor "Worker".
//
// Workers are are owned by a |Orchestrator| who assigns groups of traces to
// them (known as a PoolShards) and forwards any requests from end users.
// Workers are reponsible for loading assigned traces with TraceProcessor and
// executing the requests.
// Workers are owned by an |Orchestrator| which assigns traces and forwards any
// requests from end users. Workers are responsible for loading assigned traces
// with TraceProcessor and executing the requests.
service CloudTraceProcessorWorker {
// Creates a TracePoolShard which will be owned by this worker and returns
// whether it was successfully created.
//
// Orchestrators are responsible for handling groups of traces which the user
// has requested to be loaded: these are known as TracePools. The orchestrator
// then breaks these pools into pieces and shards them out to workers, each of
// which is known as a TracePoolShard.
//
// Thus, a TracePoolShard is unique identified by the tuple (worker, pool id).
rpc TracePoolShardCreate(TracePoolShardCreateArgs)
returns (TracePoolShardCreateResponse);

// Associates the provided list of traces to this TracePoolShard and returns
// a stream with each element indicating the successful load of one trace
// (which allows monitoring the progress of loads) or a terminal error if the
// assignment of any trace failed.
//
// If this operation completes successfully, any future requests to this pool
// shard will refer to this set of traces.
rpc TracePoolShardSetTraces(TracePoolShardSetTracesArgs)
returns (stream TracePoolShardSetTracesResponse);

// Executes a SQL query on the specified TracePoolShard and returns a stream
// with each element being the response for executing the query on the
// associated trace.
//
// Note that each trace can return >1 result due to chunking of protos at the
// Synchronize the state of the traces in the worker to the orchestrator. The
// orchestrator uses this method to ensure the worker is always fully up to
// date with the state according to the orchestrator. This makes the system
// resilient to worker restarts (i.e. loss of local state).
rpc SyncTraceState(SyncTraceStateArgs)
returns (stream SyncTraceStateResponse);

// Executes a SQL query on the specified trace and returns a stream of
// execution responses. Note that this method returns a stream because each
// trace can return >1 result due to chunking of protos at the
// TraceProcessor::QueryResult level.
rpc TracePoolShardQuery(TracePoolShardQueryArgs)
returns (stream TracePoolShardQueryResponse);

// Destroys the TracePoolShard with the specified id.
//
// Any future requests to this shard id will return an error. However, the
// same pool id can be used to create a new shard.
rpc TracePoolShardDestroy(TracePoolShardDestroyArgs)
returns (TracePoolShardDestroyResponse);
}

// Request/Response for Worker::TracePoolShardCreate.
message TracePoolShardCreateArgs {
optional string pool_id = 1;
optional TracePoolType pool_type = 2;
rpc QueryTrace(QueryTraceArgs) returns (stream QueryTraceResponse);
}
message TracePoolShardCreateResponse {}

// Request/Response for Worker::TracePoolShardSetTraces.
message TracePoolShardSetTracesArgs {
optional string pool_id = 1;

// The list of traces which should be associated with this shard. The existing
// loaded trace list will be diffed against this list. Traces not present in
// this list and loaded will be unloaded while traces present in this list
// and unloaded will be loaded.
repeated string traces = 2;
// Request/Response for Worker::Sync.
message SyncTraceStateArgs {
repeated string traces = 1;
}
message TracePoolShardSetTracesResponse {
optional string trace = 1;
message SyncTraceStateResponse {
optional double load_progress = 1;
}

// Request/Response for Worker::TracePoolShardQuery.
message TracePoolShardQueryArgs {
optional string pool_id = 1;
// Request/Response for Worker::TraceQuery.
message QueryTraceArgs {
optional string trace = 1;
optional string sql_query = 2;
}
message TracePoolShardQueryResponse {
message QueryTraceResponse {
optional string trace = 1;
optional QueryResult result = 2;
}

// Request/Response for Worker::TracePoolShardDestroy.
message TracePoolShardDestroyArgs {
optional string pool_id = 1;
}
message TracePoolShardDestroyResponse {}
Loading

0 comments on commit 2c366a1

Please sign in to comment.