Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor the event channel #1912

Merged
merged 3 commits into from
Mar 9, 2022
Merged

Refactor the event channel #1912

merged 3 commits into from
Mar 9, 2022

Conversation

yahoNanJing
Copy link
Contributor

Which issue does this PR close?

Closes #1909.

Rationale for this change

Mentioned in #1909.

What changes are included in this PR?

Introduce the following for event-based processing:

  • EventAction for common trait
  • EventLoop for common struct

Are there any user-facing changes?

It's blocked by #1911.

tokio::spawn(async move {
info!("Starting the event loop {}", name);
while !stopped.load(Ordering::SeqCst) {
let event = rx_event.recv().await.unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I understand, if the channel is closed, this will panic and the task will die?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. In general, the channel won't be closed until the scheduler is down. The dangling task should be handled by another way, like speculative task execution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In IOx and elsewhere instead of panic when the other end of a channel shuts down, we do something like

info!("Event Channel closed, shutting down");
return

so it is clear from the logs what is happening and gives the process / task a chance at a clean shutdown

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @alamb. I'll add the informative log.

{
if let Some(event_loop) = self.event_loop.as_mut() {
// It's OK here, since we are sure the mutable reference only be used in the initialization
unsafe {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid this by just not wrapping the EventLoop in an Arc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, in this PR the EventLoop don't need Arc. However, from the whole processing figure in #1704, the event loop will be used in somewhere else. The related PR is under progress.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. In that case, does the EventLoop need to be passed around, or can we just pass a Sender to different producers? In that case, the EventLoop could still be owned and multiple producers could hold a clone of the event loops Sender.

Maybe @alamb could comment on how much we should care to avoiding unsafe.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The philosophy in Rust in general is to avoid unsafe as much as possible. Typically it unsafe is used for one of two reasons:

  1. Interfacing with other languages (like C/C++) that don't have the same lifetime system as Rust
  2. Performance where the overhead of doing things like array bounds checks is too great

Neither of those situations seems to apply here.

I would personally suggest figuring out some way to avoid this unsafe block - @thinkharderdev has one suggestion;

I don't know this code well enough to offer anything specific, but I would say another pattern for something like this is "Interior Mutability" which is a fancy rust way of saying use something like Mutex. Perhaps

event_loop: Option<Arc<Mutex<EventLoop<SchedulerServerEvent>>>>,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @alamb, the reason I prefer not to add Mutex here is that the mutation only happens in the initialization phase. In other places, only it's only for reading. And we can avoid the overhead of locks.

@thinkharderdev, this PR is for providing some encapsulation for the channel processing and extracting some common behaviors in the EventLoop.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I think it's a great idea to encapsulate the event loop, but I think we can avoid the unsafe and the overhead of a Mutex or RwLock by decoupling the producer and consumer interface. If we want to encapsulate the error handling around sending an event on the underlying channel then perhaps we can create an explicit EventProducer like:

pub struct EventProducer<E> {
  tx_event: mpsc::Sender<E>
}

impl<E> EventProducer<E> {
      pub async fn post_event(&self, event: E) -> Result<()> {
        if let Some(tx_event) = self.tx_event.as_ref() {
            tx_event.send(event).await.map_err(|e| {
                BallistaError::General(format!("Fail to send event due to {}", e))
            })?
        } else {
            warn!("Event sender not exist!!!");
        }

        Ok(())
    }
} 

impl<E> EventLoop<E> {
  pub fn producer(&self) -> Result<EventProducer<E>> {
    EventPrdocuer::new(self.tx_event.cloned().ok_or(...)?)
  }
}

Copy link
Contributor

@alamb alamb Mar 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My personal recommendation is go with the safe approach (even if it seems slower at first as it may require locks). My rationale is:

I have spent countless hours tracking down strange, subtle and hard to reproduce bugs related to memory corruption when working on C/C++ systems. My Rust experience has largely been free of such pain and I think the avoidance of that pain a key advantage of Rust and makes some of the painful parts of rust (like the borrow checker) worthwhile. While the unsafe during initialization is fine now, if the code changes over time there may be a real chance of race conditions or other situations not currently anticipated

I also think the overhead of taking a mutex is likely to be low, and you could make it lower still by using using a RwLock instead.

Even better would be if you can refactor the code so that it is clear when the initialization has occurred so you need neither mutex nor unsafe, perhaps as @thinkharderdev is suggesting

I like to think in most matters I am pretty pragmatic, but the use of unsafe is something I do feel quite strongly about.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @thinkharderdev and @alamb. I'll refine the code according to the @thinkharderdev's suggestion.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @yahoNanJing -- I skimmed this PR and the basic idea of breaking the system down into smaller modules I think is a great direction.

One thing I wanted to mention that might help with review / merge speed in the future is to break such refactoring PRs into two parts:

  1. A PR that just moves code around / breaks up files but doesn't change behavior
  2. The PR that changes behavior

The reason is that often moving files around causes large diffs in github but if there is no change in behavior can be straightforward to review / merge

The changes in behavior may take some more thought / comment and can be harder to understand in a larger diff.

Also, a larger PR has a greater chance to accumulate conflicts so if behavior changes get delayed in review, that can also cause issues

tokio::spawn(async move {
info!("Starting the event loop {}", name);
while !stopped.load(Ordering::SeqCst) {
let event = rx_event.recv().await.unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In IOx and elsewhere instead of panic when the other end of a channel shuts down, we do something like

info!("Event Channel closed, shutting down");
return

so it is clear from the logs what is happening and gives the process / task a chance at a clean shutdown

EventAction<SchedulerServerEvent> for SchedulerServerEventAction<T, U>
{
// TODO
fn on_start(&self) {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what the plan is here, but sometimes the todo!() macro gets used in situations so the TODO isn't silently forgotten. However, since todo!() panic's this may not be possible if the functions are called

Suggested change
fn on_start(&self) {}
fn on_start(&self) {
todo!();
}

Copy link
Contributor Author

@yahoNanJing yahoNanJing Mar 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, the reason not to use todo!() is to avoid panic. The reason to add TODO comments is for future error handling or other things to make the whole system more robust. If it makes confusing, we can just remove them currently.

{
if let Some(event_loop) = self.event_loop.as_mut() {
// It's OK here, since we are sure the mutable reference only be used in the initialization
unsafe {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The philosophy in Rust in general is to avoid unsafe as much as possible. Typically it unsafe is used for one of two reasons:

  1. Interfacing with other languages (like C/C++) that don't have the same lifetime system as Rust
  2. Performance where the overhead of doing things like array bounds checks is too great

Neither of those situations seems to apply here.

I would personally suggest figuring out some way to avoid this unsafe block - @thinkharderdev has one suggestion;

I don't know this code well enough to offer anything specific, but I would say another pattern for something like this is "Interior Mutability" which is a fancy rust way of saying use something like Mutex. Perhaps

event_loop: Option<Arc<Mutex<EventLoop<SchedulerServerEvent>>>>,

@liukun4515
Copy link
Contributor

Thanks @yahoNanJing -- I skimmed this PR and the basic idea of breaking the system down into smaller modules I think is a great direction.

One thing I wanted to mention that might help with review / merge speed in the future is to break such refactoring PRs into two parts:

  1. A PR that just moves code around / breaks up files but doesn't change behavior
  2. The PR that changes behavior

The reason is that often moving files around causes large diffs in github but if there is no change in behavior can be straightforward to review / merge

The changes in behavior may take some more thought / comment and can be harder to understand in a larger diff.

Also, a larger PR has a greater chance to accumulate conflicts so if behavior changes get delayed in review, that can also cause issues

I think we can review this pull request after rebase.
@alamb

@yahoNanJing
Copy link
Contributor Author

Thanks @yahoNanJing -- I skimmed this PR and the basic idea of breaking the system down into smaller modules I think is a great direction.

One thing I wanted to mention that might help with review / merge speed in the future is to break such refactoring PRs into two parts:

  1. A PR that just moves code around / breaks up files but doesn't change behavior
  2. The PR that changes behavior

The reason is that often moving files around causes large diffs in github but if there is no change in behavior can be straightforward to review / merge

The changes in behavior may take some more thought / comment and can be harder to understand in a larger diff.

Also, a larger PR has a greater chance to accumulate conflicts so if behavior changes get delayed in review, that can also cause issues

Thanks @alamb. The background of doing this refactoring is mainly for achieving the final goal of #1704. Without these refactoring, all of the complex things will messed up in one or two files, which is really hard for maintenance.

For the step by step refactoring, every PR will be tested and verified without changing the behavior.

1 similar comment
@yahoNanJing
Copy link
Contributor Author

Thanks @yahoNanJing -- I skimmed this PR and the basic idea of breaking the system down into smaller modules I think is a great direction.

One thing I wanted to mention that might help with review / merge speed in the future is to break such refactoring PRs into two parts:

  1. A PR that just moves code around / breaks up files but doesn't change behavior
  2. The PR that changes behavior

The reason is that often moving files around causes large diffs in github but if there is no change in behavior can be straightforward to review / merge

The changes in behavior may take some more thought / comment and can be harder to understand in a larger diff.

Also, a larger PR has a greater chance to accumulate conflicts so if behavior changes get delayed in review, that can also cause issues

Thanks @alamb. The background of doing this refactoring is mainly for achieving the final goal of #1704. Without these refactoring, all of the complex things will messed up in one or two files, which is really hard for maintenance.

For the step by step refactoring, every PR will be tested and verified without changing the behavior.

}
_ => {}
} else {
info!("Event Channel closed, shutting down");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@alamb
Copy link
Contributor

alamb commented Mar 7, 2022

I am happy with this PR now other than the unsafe -- if @thinkharderdev or @liukun4515 is ok with the unsafe approach, I'll be happy to merge this PR. Otherwise I really suggest going with a RwLock

@thinkharderdev
Copy link
Contributor

I am happy with this PR now other than the unsafe -- if @thinkharderdev or @liukun4515 is ok with the unsafe approach, I'll be happy to merge this PR. Otherwise I really suggest going with a RwLock

I would also prefer we remove the unsafe block before merging.

@yahoNanJing
Copy link
Contributor Author

Hi @thinkharderdev and @alamb, could you help review the latest commit for fixing unsafe.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great to me. Thank you @yahoNanJing and @thinkharderdev

I believe @liukun4515 is planning to review this tomorrow so I will hold off on merging until his review is complete

Copy link
Contributor

@thinkharderdev thinkharderdev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great!

Copy link
Contributor

@liukun4515 liukun4515 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@alamb alamb merged commit 48d6450 into apache:master Mar 9, 2022
@alamb
Copy link
Contributor

alamb commented Mar 9, 2022

Thanks again @yahoNanJing and to @thinkharderdev and @liukun4515 for the reviews

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Ballista] Refactor the event channel
5 participants