Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ballista Executor report plan/operators metrics to Ballista Scheduler when task finish #116

Closed
mingmwang opened this issue Aug 5, 2022 · 3 comments · Fixed by #124
Closed
Labels
enhancement New feature or request

Comments

@mingmwang
Copy link
Contributor

mingmwang commented Aug 5, 2022

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
(This section helps Arrow developers understand the context and why for this feature, in addition to the what)

In the current Ballista implementation, when the Executor finish the tasks, only the task status(Failed/Complete) is report back to the Scheduler, the plan metrics are not collected and further combined in the Scheduler, the Scheduler does not have an overall view of plan execution metrics, like how many rows are scanned, total elapse times of each operator etc.

Describe the solution you'd like
A clear and concise description of what you want to happen.

  1. In the Executor side, when the task is finished, need to collect all the plan operators metrics for that task.
  2. Send the plan metrics back to Scheduler along with TaskStatus.
  3. Scheduler need to further combine/aggregate the metrics from all the completed tasks and finally update the plan metrics to the physical plan hold by the Scheduler.
  4. It is better that DataFusion can add a unique id to ExecutionPlanMetricsSet, so that Ballista Scheduler can easily combine and aggregate metrics.
  5. The metrics combine/aggregation overhead could be quite expensive, considering there might be tens of thousands of tasks for a heavy stage, the metrics aggregation should be handled in an async way and should not impact the major stage/task scheduling and task result processing flow and allow dropping the metrics if the metrics combine/aggregate speed can not catch up.

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

@thinkharderdev
Copy link
Contributor

In our project we've used the ExecutorMetricsCollector for this purpose and it seems to work relatively well. There is some additional plumbing needed since you have to have an rpc service on the scheduler side which can collect the metrics (and store them somewhere).

But basically what it looks like is you have something like:

struct MetricsPublisher {
    rx: Receiver<StageCompletion>,
    metrics_client: QueryManagerServiceClient<tonic::transport::Channel>,
}

which will listen for completion events and call the scheduler rpc in the background to publish them for aggregation

and then something like

pub struct ExecutionMetricsCollector {
    prometheus_metrics: PrometheusMetrics,
    sender: Sender<StageCompletion>,
}

impl ExecutorMetricsCollector for ExecutionMetricsCollector {
    fn record_stage(
        &self,
        job_id: &str,
        stage_id: usize,
        partition: usize,
        plan: ShuffleWriterExec,
    ) {

        self.sender
            .try_send(StageCompletion {
                partition_id: partition,
                shuffle_write: plan,
            })
            .unwrap_or_else(|err| {
                warn!(
                    job_id,
                    stage_id, partition, "error sending stage completion to channel: {:?}", err
                )
            });
    }
}

Then of course you need to serialize the ExecutionMetricsSet to a protobuf struct but that is relatively straightforward.

@thinkharderdev
Copy link
Contributor

We can take a look at upstreaming what we have

@mingmwang
Copy link
Contributor Author

mingmwang commented Aug 8, 2022

@thinkharderdev I think we already have a RPC for the task status update. We can enhance the existing method to report task metrics to Scheduler instead of adding another RPC.

rpc UpdateTaskStatus (UpdateTaskStatusParams) returns (UpdateTaskStatusResult) {}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants