Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync: add watch::Sender::send_modify method #4310

Merged
merged 11 commits into from
Feb 22, 2022
66 changes: 49 additions & 17 deletions tokio/src/sync/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ use crate::loom::sync::atomic::Ordering::Relaxed;
use crate::loom::sync::{Arc, RwLock, RwLockReadGuard};
use std::mem;
use std::ops;
use std::panic;

/// Receives values from the associated [`Sender`](struct@Sender).
///
Expand Down Expand Up @@ -437,27 +438,40 @@ impl<T> Sender<T> {
Ok(())
}

/// Sends a new value via the channel, notifying all receivers and returning
/// the previous value in the channel.
/// Modifies watched value, notifying all receivers.
///
/// This can be useful for reusing the buffers inside a watched value.
/// Additionally, this method permits sending values even when there are no
/// receivers.
/// This can useful for modifying the watched value, without
/// having to allocate a new instance. Additionally, this
/// method permits sending values even when there are no receivers.
///
/// # Examples
///
/// ```
/// use tokio::sync::watch;
///
/// let (tx, _rx) = watch::channel(1);
/// assert_eq!(tx.send_replace(2), 1);
/// assert_eq!(tx.send_replace(3), 2);
/// struct State {
/// counter: usize,
/// }
/// let (state_tx, state_rx) = watch::channel(State { counter: 0 });
/// state_tx.send_modify(|state| state.counter += 1);
/// assert_eq!(state_rx.borrow().counter, 1);
/// ```
pub fn send_replace(&self, value: T) -> T {
let old = {
pub fn send_modify<F>(&self, func: F)
where
F: FnOnce(&mut T),
{
nylonicious marked this conversation as resolved.
Show resolved Hide resolved
{
// Acquire the write lock and update the value.
let mut lock = self.shared.value.write().unwrap();
let old = mem::replace(&mut *lock, value);
// Update the value and catch possible panic inside func.
let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
func(&mut lock);
}));
// If the func panicked return the panic to the caller.
if let Err(error) = result {
// Drop the lock to avoid poisoning it.
drop(lock);
panic::resume_unwind(error);
}
nylonicious marked this conversation as resolved.
Show resolved Hide resolved

self.shared.state.increment_version();

Expand All @@ -467,14 +481,32 @@ impl<T> Sender<T> {
// that receivers are able to figure out the version number of the
// value they are currently looking at.
drop(lock);
}

old
};

// Notify all watchers
self.shared.notify_rx.notify_waiters();
}

/// Sends a new value via the channel, notifying all receivers and returning
/// the previous value in the channel.
///
/// This can be useful for reusing the buffers inside a watched value.
/// Additionally, this method permits sending values even when there are no
/// receivers.
///
/// # Examples
///
/// ```
/// use tokio::sync::watch;
///
/// let (tx, _rx) = watch::channel(1);
/// assert_eq!(tx.send_replace(2), 1);
/// assert_eq!(tx.send_replace(3), 2);
/// ```
pub fn send_replace(&self, mut value: T) -> T {
// swap old watched value with the new one
self.send_modify(|old| mem::swap(old, &mut value));

old
value
}
nylonicious marked this conversation as resolved.
Show resolved Hide resolved

/// Returns a reference to the most recently sent value
Expand Down