Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor scheduler state with different management policy for volatile and stable states #1810

Merged
merged 2 commits into from
Feb 16, 2022
Merged

Conversation

yahoNanJing
Copy link
Contributor

Which issue does this PR close?

Closes #1703.

Rationale for this change

See the detailed discussion in #1703.

What changes are included in this PR?

Classify the states in the SchedulerState into two categories: VolatileSchedulerState and StableSchedulerState.
According to #1703, the VolatileSchedulerState maintains the following states in memory:

  • executor heartbeat
  • executor data, mainly for available resources
  • tasks

And StableSchedulerState maintains the following states in both memory and storage db:

  • executor metadata, mainly for the cluster topology info
  • jobs
  • stages
    The stable states will be stored in both memory and storage db. The in-memory ones are the cache for fast reading and reducing deserialization cost.

The previous Watch in ConfigBackendClient for task status is no longer used. Instead, we leverage an event channel to achieve the job status update. Later with more states info, like pending tasks, the job status update can be handled explicitly so that the current cumbersome update implementation will no longer be used.

Comment on lines 155 to 167
.receive_heart_beat(HeartBeatParams {
executor_id: self.executor.metadata.id.clone(),
state: Some(self.get_executor_state().await.into()),
})
.await
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe it is good to rename the method name to heart_beat_from_executor().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. It's more clear than before.

Comment on lines 206 to 227
async fn get_executor_state(&self) -> SExecutorState {
SExecutorState {
available_memory_size: u64::MAX,
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this method is async ?

tokio_stream::wrappers::TcpListenerStream::new(listener),
),
);

let executor_meta = ExecutorRegistration {
id: Uuid::new_v4().to_string(), // assign this executor a unique ID
optional_host: Some(OptionalHost::Host("localhost".to_string())),
port: addr.port() as u32,
// TODO Make it configurable
grpc_port: 50020,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do not hard code the grpc_port, please make it configurable otherwise we will be unable to startup multiple executor instances in one nodes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's mainly for unit test. For production execution, it's already configurable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this PR doesn't introduce the hard coded port -- perhaps it is worth a ticket to track making the value configurable

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't actually compile currently. Looks like the CI build doesn't compile/test with the standalone feature so I broke this with my PR from a few days ago.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I patched up the compile errors in #1839 -- looks like a test is also failing https://github.com/apache/arrow-datafusion/issues/1840

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @alamb and @thinkharderdev, should we fix the standalone ut issue here or by another PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yahoNanJing @alamb already pushed a PR to address the compilation issue #1839 so you're good.

.into_iter()
.filter(|e| alive_executors.contains_key(&e.executor_id))
.collect();
let mut available_executors = self.state.get_available_executors_data();

// In case of there's no enough resources, reschedule the tasks of the job
if available_executors.is_empty() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why spawn another future and just sleep ? I think you can just simply sleep in the schedule_job() method if there is no enough resources.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Since the inner sleep is async and will not block the execution threads. Currently it's no need to spawn another thread for this.

#[derive(Clone)]
pub(super) struct SchedulerState {
struct VolatileSchedulerState {
executors_heartbeat: Arc<std::sync::RwLock<HashMap<String, ExecutorHeartbeat>>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use RwLock from parking_lot ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Should be consistent with other parts. And the performance may also be better.


// job -> stage -> partition
tasks: Arc<std::sync::RwLock<HashMap<String, JobTasks>>>,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

job -> stage -> task

let stage_tasks = job_tasks
.entry(partition_id.stage_id)
.or_insert_with(HashMap::new);
stage_tasks.insert(partition_id.partition_id, status.clone());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should rename the partition_id to task_id.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. It's better to rename the partition_id in the protobuf::TaskStatus to task_id

@yahoNanJing
Copy link
Contributor Author

Since this PR has conflicts with the master code, I'll squash and merge first. Then I'll do the rebase.

Cache volatile state just in memory without storing them in db

Fix ut

Keep volatile state just in memory rather than store them in db

Cache stable state in memory

Fix ut

Fix for mingmwang's comments

Rename partition_id to task_id in protobuf::TaskStatus

Rename the names of the in-memory structs
@yahoNanJing
Copy link
Contributor Author

Hi @alamb, @houqp, could you review this PR and give some comments?

@alamb
Copy link
Contributor

alamb commented Feb 14, 2022

Hi @yahoNanJing I will look at this tomorrow. Also FYI I think @houqp may be delayed in responding for a while.

@liukun4515
Copy link
Contributor

I will look this later.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not an expert in this code, and I can't say I understand all of the changes but the basics made sense to me and the tests all still pass, so 👍 from my end.

If @liukun4515 is good with this change I think it can be merged.

cc @edrevo @andygrove @thinkharderdev and @realno in case you would like to review

Otherwise I am happy to merge this PR tomorrow

tokio_stream::wrappers::TcpListenerStream::new(listener),
),
);

let executor_meta = ExecutorRegistration {
id: Uuid::new_v4().to_string(), // assign this executor a unique ID
optional_host: Some(OptionalHost::Host("localhost".to_string())),
port: addr.port() as u32,
// TODO Make it configurable
grpc_port: 50020,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this PR doesn't introduce the hard coded port -- perhaps it is worth a ticket to track making the value configurable

@alamb
Copy link
Contributor

alamb commented Feb 15, 2022

Thank you for the contributions @yahoNanJing as well as your patience in the review process

@alamb
Copy link
Contributor

alamb commented Feb 15, 2022

Also cc @Ted-Jiang

(I am sorry for the wide set of cc's but I don't know which of these contributors are working / coordinating together and I want to keep the information flowing between you all)

executors_data.get(executor_id).cloned()
}

/// There are too checks:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// There are too checks:
/// There are two checks:

tokio::spawn(async move {
info!("Starting the scheduler state watcher");
loop {
let task_status = rx_task.recv().await.unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are few things here may panic, which will terminate the thread. I am wondering how this is handled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @realno. Agree. It's better to just print error message for many cases. Will raise a commit for enhancing this error handling.

@realno
Copy link
Contributor

realno commented Feb 16, 2022

Left a small question, otherwise looks good. Thanks @yahoNanJing !

}
let tx_job = self.scheduler_env.as_ref().unwrap().tx_job.clone();
for job_id in jobs {
tx_job.send(job_id).await.unwrap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
tx_job.send(job_id).await.unwrap();
tx_job.send(job_id).await?;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we handle panic here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Ted-Jiang. Agree. It's better to use ? instead of directly use unwrap for panic.

@yahoNanJing
Copy link
Contributor Author

Hi @realno, could you help review this error handling commit?

@alamb
Copy link
Contributor

alamb commented Feb 16, 2022

Given how large this PR is getting (and thus its potential for accumulating conflicts) I am going to merge it to main and we can keep iterating in future PRs.

I looked at the error handling commit at 12b1c73 and it looked better than unwrap() to me (though there is likely still significant room for UX improvement with error handling).

@alamb alamb merged commit 407adc0 into apache:master Feb 16, 2022
@alamb
Copy link
Contributor

alamb commented Feb 16, 2022

Thank you everyone who helped review and thanks to @yahoNanJing for the contribution!

@realno
Copy link
Contributor

realno commented Feb 16, 2022

Hi @realno, could you help review this error handling commit?

Look good, Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Ballista] Support to better manage cluster state, like alive executors, executor available task slots, etc
8 participants