From 8c6e55087a4acc9be56f7985fd97c5a7deeb2542 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Fri, 5 Feb 2021 20:06:55 +0100 Subject: [PATCH] stream: update features and doc for broadcast/watch stream --- tokio-stream/Cargo.toml | 3 ++- tokio-stream/src/macros.rs | 10 ++++++++++ tokio-stream/src/wrappers.rs | 18 +++++++++++++----- tokio-stream/src/wrappers/broadcast.rs | 1 + tokio-stream/src/wrappers/watch.rs | 1 + tokio/src/sync/broadcast.rs | 5 +++++ tokio/src/sync/watch.rs | 5 +++++ 7 files changed, 37 insertions(+), 6 deletions(-) diff --git a/tokio-stream/Cargo.toml b/tokio-stream/Cargo.toml index 0bc03ac0331..6172f98b66f 100644 --- a/tokio-stream/Cargo.toml +++ b/tokio-stream/Cargo.toml @@ -25,12 +25,13 @@ time = ["tokio/time"] net = ["tokio/net"] io-util = ["tokio/io-util"] fs = ["tokio/fs"] +sync = ["tokio/sync", "tokio-util"] [dependencies] futures-core = { version = "0.3.0" } pin-project-lite = "0.2.0" tokio = { version = "1.0", features = ["sync"] } -tokio-util = { version = "0.6.3" } +tokio-util = { version = "0.6.3", optional = true } [dev-dependencies] tokio = { version = "1.0", features = ["full", "test-util"] } diff --git a/tokio-stream/src/macros.rs b/tokio-stream/src/macros.rs index 39ad86cc5d8..d4a72c8ec43 100644 --- a/tokio-stream/src/macros.rs +++ b/tokio-stream/src/macros.rs @@ -38,6 +38,16 @@ macro_rules! cfg_time { } } +macro_rules! cfg_sync { + ($($item:item)*) => { + $( + #[cfg(feature = "sync")] + #[cfg_attr(docsrs, doc(cfg(feature = "sync")))] + $item + )* + } +} + macro_rules! ready { ($e:expr $(,)?) => { match $e { diff --git a/tokio-stream/src/wrappers.rs b/tokio-stream/src/wrappers.rs index 405f35a5b61..0e8ebdfb4a1 100644 --- a/tokio-stream/src/wrappers.rs +++ b/tokio-stream/src/wrappers.rs @@ -1,17 +1,25 @@ //! Wrappers for Tokio types that implement `Stream`. +/// Error types for the wrappers. +pub mod errors { + cfg_sync! { + pub use crate::wrappers::broadcast::BroadcastStreamRecvError; + } +} + mod mpsc_bounded; pub use mpsc_bounded::ReceiverStream; mod mpsc_unbounded; pub use mpsc_unbounded::UnboundedReceiverStream; -mod broadcast; -pub use broadcast::BroadcastStream; -pub use broadcast::BroadcastStreamRecvError; +cfg_sync! { + mod broadcast; + pub use broadcast::BroadcastStream; -mod watch; -pub use watch::WatchStream; + mod watch; + pub use watch::WatchStream; +} cfg_time! { mod interval; diff --git a/tokio-stream/src/wrappers/broadcast.rs b/tokio-stream/src/wrappers/broadcast.rs index f3ff002355c..06a982d1749 100644 --- a/tokio-stream/src/wrappers/broadcast.rs +++ b/tokio-stream/src/wrappers/broadcast.rs @@ -12,6 +12,7 @@ use std::task::{Context, Poll}; /// /// [`tokio::sync::broadcast::Receiver`]: struct@tokio::sync::broadcast::Receiver /// [`Stream`]: trait@crate::Stream +#[cfg_attr(docsrs, doc(cfg(feature = "sync")))] pub struct BroadcastStream { inner: ReusableBoxFuture<(Result, Receiver)>, } diff --git a/tokio-stream/src/wrappers/watch.rs b/tokio-stream/src/wrappers/watch.rs index e58de918d29..a98a72cfd56 100644 --- a/tokio-stream/src/wrappers/watch.rs +++ b/tokio-stream/src/wrappers/watch.rs @@ -12,6 +12,7 @@ use tokio::sync::watch::error::RecvError; /// /// [`tokio::sync::watch::Receiver`]: struct@tokio::sync::watch::Receiver /// [`Stream`]: trait@crate::Stream +#[cfg_attr(docsrs, doc(cfg(feature = "sync")))] pub struct WatchStream { inner: ReusableBoxFuture<(Result<(), RecvError>, Receiver)>, } diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 58ea481ca85..3ef8f84b2ac 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -163,6 +163,11 @@ pub struct Sender { /// Must not be used concurrently. Messages may be retrieved using /// [`recv`][Receiver::recv]. /// +/// To turn this receiver into a `Stream`, you can use the [`BroadcastStream`] +/// wrapper. +/// +/// [`BroadcastStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.BroadcastStream.html +/// /// # Examples /// /// ``` diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 6732d38aba0..5590a754dd0 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -61,6 +61,11 @@ use std::ops; /// Receives values from the associated [`Sender`](struct@Sender). /// /// Instances are created by the [`channel`](fn@channel) function. +/// +/// To turn this receiver into a `Stream`, you can use the [`WatchStream`] +/// wrapper. +/// +/// [`WatchStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.WatchStream.html #[derive(Debug)] pub struct Receiver { /// Pointer to the shared state