Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up job data on both Scheduler and Executor #188

Merged
merged 3 commits into from
Oct 12, 2022

Conversation

mingmwang
Copy link
Contributor

Which issue does this PR close?

Closes #9 and #185.

Rationale for this change

What changes are included in this PR?

Are there any user-facing changes?

@mingmwang
Copy link
Contributor Author

Keyspace::CompletedJobs
};

let lock = state.lock(keyspace.clone(), "").await?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will lock the whole 🤔 in standalone

 async fn lock(&self, keyspace: Keyspace, key: &str) -> Result<Box<dyn Lock>> {
        let mut mlock = self.locks.lock().await;
        let lock_key = format!("/{:?}/{}", keyspace, key);
        if let Some(lock) = mlock.get(&lock_key) {
            Ok(Box::new(lock.clone().lock_owned().await))
        } else {
            let new_lock = Arc::new(Mutex::new(()));
            mlock.insert(lock_key, new_lock.clone());
            Ok(Box::new(new_lock.lock_owned().await))
        }
    }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, this lock only works in func async fn lock(&self, keyspace: Keyspace, key: &str), will release after this func quick call.

Comment on lines 618 to 621
let alive_executors = executor_manager.get_alive_executors_within_one_minute();
for executor in alive_executors {
let job_id_clone = job_id.to_owned();
let executor_manager_clone = executor_manager.clone();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think each SQL sends an RPC to all executors is not a good idea (for interactive query finish in ms)🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can do some improvement later !

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in current code base, the TaskManager/ExecutionGraph do not track the tasks are executed by which executors.

let job_id_str = job_id.to_owned();
let active_job_cache = self.active_job_cache.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(CLEANUP_FINISHED_JOB_DELAY_SECS))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(for interactive query finish in ms) i think this will store a lot feature in heap. Just some opinion.

error!("{}", msg);
Status::internal(msg)
})?
.post_event(QueryStageSchedulerEvent::JobCancel(job_id))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice refinement. It's better to have only one entrance to modify the scheduler state and all of the state changes should have a related event and be dealt with in the event loop.

@andygrove
Copy link
Member

andygrove commented Sep 25, 2022

I am constantly filling my disk up with shuffle files so would love to see us get this merged before the 0.9.0 release.

@mingmwang Could you rebase when you get a chance and I will test this out and review the PR as well.

@mingmwang
Copy link
Contributor Author

I am constantly filling my disk up with shuffle files so would love to see us get this merged before the 0.9.0 release.

@mingmwang Could you rebase when you get a chance and I will test this out and review the PR as well.

Sure, working on it.

@mingmwang
Copy link
Contributor Author

@andygrove @yahoNanJing @Ted-Jiang @yahoNanJing

BTW, in this PR, the job data in the state store will also be deleted after 300s.
I think we need a following PR to move the completed(Success or Failed) job data from state store to ObjectStore
for long time storing purpose, and Scheduler UI can read from the ObjectStore.

Please share your thoughts.

const CLEANUP_FINISHED_JOB_DELAY_SECS: u64 = 300;

async fn clean_up_job_data(
        state: Arc<dyn StateBackendClient>,
        active_job_cache: ExecutionGraphCache,
        failed: bool,
        job_id: String,
        executor_manager: Option<ExecutorManager>,
    ) -> Result<()> {
        let mut active_graph_cache = active_job_cache.write().await;
        active_graph_cache.remove(&job_id);

        let keyspace = if failed {
            Keyspace::FailedJobs
        } else {
            Keyspace::CompletedJobs
        };

        let lock = state.lock(keyspace.clone(), "").await?;
        with_lock(lock, state.delete(keyspace, &job_id)).await?;

        executor_manager
            .map(|em| async { Self::clean_up_executors_data(job_id.clone(), em).await });
        Ok(())
    }

type ExecutionGraphCache = Arc<RwLock<HashMap<String, Arc<RwLock<ExecutionGraph>>>>>;

const CLEANUP_FINISHED_JOB_DELAY_SECS: u64 = 300;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make this configurable. Some of the queries I am testing take much longer than 300 seconds. We already have the ability to set configs on the context.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind, this is a delay after the job completes. I would still like to see this configurable but we could do that as a follow in PR.

Copy link
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks @mingmwang

@andygrove
Copy link
Member

@mingmwang could you fix the conflicts here when you have the time so that we can merge this?

@mingmwang
Copy link
Contributor Author

@mingmwang could you fix the conflicts here when you have the time so that we can merge this?

Sure, I will fix the conflicts tomorrow.

@mingmwang
Copy link
Contributor Author

Resolved conflicts.

@andygrove
Copy link
Member

Thanks again @mingmwang

@andygrove andygrove merged commit e42a6c9 into apache:master Oct 12, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Need clean up intermediate data in Ballista
4 participants