Skip to content

Commit

Permalink
sync: add resubscribe method to broadcast::Receiver (#4607)
Browse files Browse the repository at this point in the history
  • Loading branch information
estk authored May 28, 2022
1 parent 05cbfae commit f6c0405
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 0 deletions.
28 changes: 28 additions & 0 deletions tokio/src/sync/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,7 @@ impl<T> Sender<T> {
}
}

/// Create a new `Receiver` which reads starting from the tail.
fn new_receiver<T>(shared: Arc<Shared<T>>) -> Receiver<T> {
let mut tail = shared.tail.lock();

Expand Down Expand Up @@ -881,6 +882,33 @@ impl<T> Receiver<T> {
}

impl<T: Clone> Receiver<T> {
/// Re-subscribes to the channel starting from the current tail element.
///
/// This [`Receiver`] handle will receive a clone of all values sent
/// **after** it has resubscribed. This will not include elements that are
/// in the queue of the current receiver. Consider the following example.
///
/// # Examples
///
/// ```
/// use tokio::sync::broadcast;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = broadcast::channel(2);
///
/// tx.send(1).unwrap();
/// let mut rx2 = rx.resubscribe();
/// tx.send(2).unwrap();
///
/// assert_eq!(rx2.recv().await.unwrap(), 2);
/// assert_eq!(rx.recv().await.unwrap(), 1);
/// }
/// ```
pub fn resubscribe(&self) -> Self {
let shared = self.shared.clone();
new_receiver(shared)
}
/// Receives the next value for this receiver.
///
/// Each [`Receiver`] handle will receive a clone of all values sent
Expand Down
38 changes: 38 additions & 0 deletions tokio/tests/sync_broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,3 +479,41 @@ fn receiver_len_with_lagged() {
fn is_closed(err: broadcast::error::RecvError) -> bool {
matches!(err, broadcast::error::RecvError::Closed)
}

#[test]
fn resubscribe_points_to_tail() {
let (tx, mut rx) = broadcast::channel(3);
tx.send(1).unwrap();

let mut rx_resub = rx.resubscribe();

// verify we're one behind at the start
assert_empty!(rx_resub);
assert_eq!(assert_recv!(rx), 1);

// verify we do not affect rx
tx.send(2).unwrap();
assert_eq!(assert_recv!(rx_resub), 2);
tx.send(3).unwrap();
assert_eq!(assert_recv!(rx), 2);
assert_eq!(assert_recv!(rx), 3);
assert_empty!(rx);

assert_eq!(assert_recv!(rx_resub), 3);
assert_empty!(rx_resub);
}

#[test]
fn resubscribe_lagged() {
let (tx, mut rx) = broadcast::channel(1);
tx.send(1).unwrap();
tx.send(2).unwrap();

let mut rx_resub = rx.resubscribe();
assert_lagged!(rx.try_recv(), 1);
assert_empty!(rx_resub);

assert_eq!(assert_recv!(rx), 2);
assert_empty!(rx);
assert_empty!(rx_resub);
}

0 comments on commit f6c0405

Please sign in to comment.