Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce CuratorTaskManager for make an active job be curated by only one scheduler #153

Merged
merged 6 commits into from
Aug 27, 2022

Conversation

yahoNanJing
Copy link
Contributor

Which issue does this PR close?

Closes #130.

Rationale for this change

What changes are included in this PR?

Are there any user-facing changes?

@yahoNanJing
Copy link
Contributor Author

Hi @thinkharderdev, @avantgardnerio and @andygrove, could you help review this PR?

A few other PRs will be proposed after this PR finishes.

@avantgardnerio
Copy link
Contributor

Yep! I'll take a look today.

Copy link
Contributor

@avantgardnerio avantgardnerio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not super familiar with the Ballista protocol and lifecycle of the execution graph and stages yet, but at a high level this makes sense to me - if we can keep a job localized to a scheduler, we can cache plans and that has helped performance at ebay?

If this is code that has been in use and is just getting upstreamed now, then I think this PR is pretty safe to merge. The code and comments all look like an improvement, and the logical change makes sense to me.

I'd love to hear if @andygrove or @thinkharderdev have any deeper observations, but this LGTM! Thank you for contributing back upstream, your help is very much appreciated as my company will be soon running into these same issues as we go into production!

let bind_host = opt.bind_host;
let port = opt.bind_port;
let log_dir = opt.log_dir;
let print_thread_info = opt.print_thread_info;

let scheduler_name = format!("scheduler_{}_{}_{}", namespace, bind_host, port);
let scheduler_name = format!("scheduler_{}_{}_{}", namespace, external_host, port);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely should be external vs bind as bind could and probably should be 0.0.0.0 for all instances of the scheduler.

BallistaCodec::default(),
);
let exec_meta = ExecutorRegistration {
id: "abc".to_owned(),
optional_host: Some(OptionalHost::Host("http://host:8080".to_owned())),
optional_host: Some(OptionalHost::Host("http://localhost:8080".to_owned())),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit wary of localhost here as it will sometimes resolve to IPv6 addresses. Is the rest of the stack capable of supporting those if that is the result of the resolution? 127.0.0.1 might be slightly safer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, I see this is just for testing n/m

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your suggestion. Agree that 127.0.0.1 is much safer

@@ -390,7 +390,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
.and_then(|m| {
m.try_into_logical_plan(
session_ctx.deref(),
self.codec.logical_extension_codec(),
self.state.codec.logical_extension_codec(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I'm daft - why does the codec move out of state? It needs to be unique to the scheduler instance for some reason?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the codec is already a field of the state and is a necessary field for the state, I think it's better for the state to own the codec rather than the server. And it's better to keep unique instance of the codec in case that we change it in the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I'm sorry, I got the direction wrong here: it went into state - a mistake with my diff tool 👍

if let Some(task_id) = status.task_id.as_ref() {
task_statuses[task_id.partition_id as usize] = status.status
for graph_stage in proto.stages {
let stage_type = graph_stage.stage_type.expect("Unexpected empty stage");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd love to see this turned into a ?.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stage_type is an option. Can we apply ? to an option.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trivia: I have learned recently that the ? operator works with Option<>s as well as Results<> if the outer function itself also returns an option.

Here it would need to be mapped to a Result<> though:

Suggested change
let stage_type = graph_stage.stage_type.expect("Unexpected empty stage");
let stage_type = graph_stage.stage_type.ok_or(BallistaError::Internal("Unexpected empty stage"))?;

@andygrove
Copy link
Member

I can only review non-trivial Ballista PRs like this in my free time since my employer has no interest in the project, and I don't have much free time right now. I am happy to approve and merge based on consensus from the people that are actually using the project. Based on the description this sounds like a good improvement.

@thinkharderdev
Copy link
Contributor

I didn't have spare cycles today to look at this in depth. I'll review more carefully tomorrow.

Copy link
Contributor

@thinkharderdev thinkharderdev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome work! This should be a big improvement. Had a few questions and some minor nits.

Comment on lines 162 to 163
/// Revive the execution graph by converting the resolved stages to running changes
/// If existing such change, return true; else false.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Revive the execution graph by converting the resolved stages to running changes
/// If existing such change, return true; else false.
/// Revive the execution graph by converting the resolved stages to running stages
/// If any stages are converted, return true; else false.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree.

Comment on lines 39 to 40
// For a job fails without its execution graph
JobFailed(String, String),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// For a job fails without its execution graph
JobFailed(String, String),
// For a job which failed during planning
JobPlanningFailed(String, String),

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree

Comment on lines 50 to 56
pub(super) enum ExecutionStage {
UnResolved(UnResolvedStage),
Resolved(ResolvedStage),
Running(RunningStage),
Completed(CompletedStage),
Failed(FailedStage),
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like representing this as an enum!


/// For a stage whose input stages are not all completed, we say it's a unresolved stage
#[derive(Clone)]
pub(super) struct UnResolvedStage {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub(super) struct UnResolvedStage {
pub(super) struct UnresolvedStage {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree

pub(super) error_message: String,
}

impl UnResolvedStage {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
impl UnResolvedStage {
impl UnresolvedStage {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree

)
.await?;

graph.revive();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would we need to call revive here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Running stages will not be persisted to the backend. Before cache the execution_graph, we need to revive it by converting all revolved stages to the running stages.

And it's better to make the plan in the running stages be the encoded one to avoid encoding cost when creating hundreds of task definitions as mentioned in #142. Later I'll propose a PR for it.

@@ -15,61 +15,40 @@
// specific language governing permissions and limitations
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this entire file move to src/state/execution_graph/mod.rs ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's a standard way. However, by this way it will remove all of the git history of execution_graph.rs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think git mv should keep it intact? Definitely if it's a single commit and we fast-forward (vs squash) merge it will.

error!("Fail to find job {} in the active cache and it may not be curated by this scheduler", job_id);
None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now it seems like we should explicitly fail the job here so the job state gets updated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since now we haven't implemented the multi-scheduler task updating logic in the executor, I prefer to leave the handling logic empty here.

use std::default::Default;
use std::sync::Arc;
use tokio::sync::RwLock;
use tonic::transport::Channel;

type ExecutorClients = Arc<RwLock<HashMap<String, ExecutorGrpcClient<Channel>>>>;
type ExecutionGraphCache = Arc<RwLock<HashMap<String, Arc<RwLock<ExecutionGraph>>>>>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about making this a Arc<RwLock<VecDequeue<(String,ExecutionGraph)>>>? Lookup by job ID becomes O(n) but the number of active jobs shouldn't be prohibitive and you get a proper queue in return so we can do FIFO scheduling of new jobs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's a good idea 🤣. If we dequeue the ExecutionGraph, and meanwhile we lookup the graph, we can get nothing. And the number of active jobs may be large. In our case, it may be thousands.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, we can sort it out in a future PR. But I think we do want to take advantage of this change to introduce proper queueing for the jobs for fair scheduling. It could be as simple as another VecDequeue<String> that holds represents the queue which we could use to indicate priority.

use std::default::Default;
use std::sync::Arc;
use tokio::sync::RwLock;
use tonic::transport::Channel;

type ExecutorClients = Arc<RwLock<HashMap<String, ExecutorGrpcClient<Channel>>>>;
type ExecutionGraphCache = Arc<RwLock<HashMap<String, Arc<RwLock<ExecutionGraph>>>>>;

#[derive(Clone)]
pub struct TaskManager<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This requires changes to the executors as well right? Currently the executor will just have a single connection to the scheduler but it will need to accept the scheduler URL as part of the task definition for this to work correctly.

Copy link
Contributor Author

@yahoNanJing yahoNanJing Aug 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct. For this part, I'll propose another PR later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we would have to implement that as part of this PR or else any multi-scheduler deployment would break.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can achieve multiple scheduler now, since there's only one scheduler in the executor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The executor only has one scheduler connection but the scheduler's can (currently) assign tasks to any executor since the executor pool is a shared global state. So they would currently report status to only one scheduler instance (unless the scheduler endpoint was pointing to a load balancer or something) but there is no guarantee that they would have received the task assignment from that scheduler.

I think it should be relatively straightforward though. The TaskAssignment would just need a scheduler URL in it and the executor would report status back to that URL (instead of maintaining a single global connection).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to get back to speed with the current architecture before I can help with this review. I won't have time to do that until this weekend, unfortunately.

It would be good if we had a doc in this repo with the architecture we are working towards to make sure we are all on the same page. I can maybe try and start that if nobody beats me to it.

Copy link
Contributor Author

@yahoNanJing yahoNanJing Aug 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even for push-based task scheduling, if all of the executors report task update to only one scheduler, when the whole cluster is busy, I think there's only one scheduler will be busy scheduling tasks and other schedulers will be idle due to there're no ExecutorReservation left for them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm OK to add another commit of the multiple scheduler change in the executors to this PR. However, it may take a few days.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What we do is configure the scheduler endpoint in the executor to be the kubernetes service endpoint. So this will load balance executor registrations across the schedulers but any given scheduler should maintain a persistent connection to only the scheduler it registered with.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. I'll propose the commit in early next week.

@andygrove
Copy link
Member

andygrove commented Aug 25, 2022

@yahoNanJing This is the next PR to merge (once there is consensus), but there are some conflicts now. Could you fix them?

@yahoNanJing
Copy link
Contributor Author

Thanks @andygrove. Just solved the conflicts by rebasing.

@andygrove
Copy link
Member

Thanks for the PR @yahoNanJing and thanks for the reviews @thinkharderdev and @avantgardnerio. It looks like there is consensus here, so I will go ahead and merge this.

@andygrove andygrove merged commit b0c792d into apache:master Aug 27, 2022
@yahoNanJing yahoNanJing mentioned this pull request Aug 30, 2022
18 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Introduce CuratorTaskManager for make an active job be curated by only one scheduler
5 participants