Skip to content

Commit

Permalink
feat: Update tokio dependency to 0.3
Browse files Browse the repository at this point in the history
  • Loading branch information
Markus Westerlind committed Nov 17, 2020
1 parent 98df733 commit 9fee882
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 43 deletions.
19 changes: 9 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ url = "2.1"
# We need this for script support
sha1 = { version = ">= 0.2, < 0.7", optional = true }

combine = { version = "4.2.1", default-features = false, features = ["std"] }
combine = { default-features = false, features = ["std"], path = "../combine" }

# Only needed for AIO
bytes = { version = "0.5", optional = true }
futures-util = { version = "0.3.0", default-features = false, optional = true }
pin-project-lite = { version = "0.1", optional = true }
tokio-util = { version = "0.3.1", optional = true }
tokio = { version = "0.2.10", optional = true }
tokio-util = { version = "0.4", optional = true }
tokio = { version = "0.3", optional = true }

# Only needed for the connection manager
arc-swap = { version = "0.4.4", optional = true }
Expand All @@ -56,22 +56,21 @@ async-trait = "0.1.24"

# Only needed for TLS
native-tls = { version = "0.2", optional = true }
tokio-tls = { version = "0.3", optional = true }
tokio-native-tls = { version = "0.2", optional = true }
async-native-tls = { version = "0.3", optional = true }

[features]
default = ["acl", "streams", "geospatial", "script"]
acl = []
aio = ["bytes", "pin-project-lite", "futures-util", "futures-util/sink", "tokio/io-util", "tokio-util", "tokio-util/codec", "tokio/sync", "combine/tokio-02"]
tokio-rt-core = ["tokio-comp", "tokio/rt-core"]
aio = ["bytes", "pin-project-lite", "futures-util", "futures-util/sink", "tokio/io-util", "tokio-util", "tokio-util/codec", "tokio/sync", "combine/tokio-03"]
geospatial = []
cluster = ["crc16", "rand"]
script = ["sha1"]
tls = ["native-tls"]
async-std-comp = ["aio", "async-std"]
async-std-tls-comp = ["async-std-comp", "async-native-tls", "tls"]
tokio-comp = ["aio", "tokio", "tokio/tcp", "tokio/uds"]
tokio-tls-comp = ["tls", "tokio-tls"]
tokio-comp = ["aio", "tokio", "tokio/net"]
tokio-native-tls-comp = ["tls", "tokio-native-tls"]
connection-manager = ["arc-swap", "futures", "aio"]
streams = []

Expand All @@ -85,12 +84,12 @@ futures = "0.3"
criterion = "0.3"
partial-io = { version = "0.3", features = ["tokio", "quickcheck"] }
quickcheck = "0.6"
tokio = { version = "0.2", features = ["rt-core", "macros", "time"] }
tokio = { version = "0.3", features = ["rt", "macros", "time"] }
tempdir = "0.3"

[[test]]
name = "test_async"
required-features = ["tokio-rt-core"]
required-features = ["tokio-comp"]

[[test]]
name = "test_async_async_std"
Expand Down
4 changes: 3 additions & 1 deletion src/aio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ impl Runtime {

#[cfg(all(feature = "tokio-comp", feature = "async-std-comp"))]
{
if ::tokio::runtime::Handle::try_current().is_ok() {
// TODO How to detect tokio? Or remove detection?
// if ::tokio::runtime::Handle::try_current().is_ok() {
if true {
Runtime::Tokio
} else {
Runtime::AsyncStd
Expand Down
21 changes: 14 additions & 7 deletions src/aio/async_std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ use async_std::net::TcpStream;
#[cfg(unix)]
use async_std::os::unix::net::UnixStream;
use async_trait::async_trait;
use tokio::io::{AsyncRead, AsyncWrite};
use futures_util::ready;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};

pin_project_lite::pin_project! {
/// Wraps the async_std `AsyncRead/AsyncWrite` in order to implement the required the tokio traits
Expand Down Expand Up @@ -64,9 +65,15 @@ where
fn poll_read(
self: Pin<&mut Self>,
cx: &mut core::task::Context,
buf: &mut [u8],
) -> std::task::Poll<Result<usize, tokio::io::Error>> {
async_std::io::Read::poll_read(self.project().inner, cx, buf)
buf: &mut ReadBuf<'_>,
) -> std::task::Poll<Result<(), tokio::io::Error>> {
let n = ready!(async_std::io::Read::poll_read(
self.project().inner,
cx,
buf.initialize_unfilled()
))?;
buf.advance(n);
std::task::Poll::Ready(Ok(()))
}
}

Expand Down Expand Up @@ -122,11 +129,11 @@ impl AsyncRead for AsyncStd {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut task::Context,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
match &mut *self {
AsyncStd::Tcp(r) => Pin::new(r).poll_read(cx, buf),
#[cfg(feature = "tokio-tls-comp")]
#[cfg(feature = "async-std-tls-comp")]
AsyncStd::TcpTls(r) => Pin::new(r).poll_read(cx, buf),
#[cfg(unix)]
AsyncStd::Unix(r) => Pin::new(r).poll_read(cx, buf),
Expand Down
28 changes: 14 additions & 14 deletions src/aio/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ use std::{
use tokio::net::UnixStream as UnixStreamTokio;

use tokio::{
io::{AsyncRead, AsyncWrite},
io::{AsyncRead, AsyncWrite, ReadBuf},
net::TcpStream as TcpStreamTokio,
};

#[cfg(feature = "tls")]
use super::TlsConnector;

#[cfg(feature = "tokio-tls-comp")]
use tokio_tls::TlsStream;
#[cfg(feature = "tokio-native-tls-comp")]
use tokio_native_tls::TlsStream;

#[cfg(unix)]
use super::Path;
Expand All @@ -28,7 +28,7 @@ pub(crate) enum Tokio {
/// Represents a Tokio TCP connection.
Tcp(TcpStreamTokio),
/// Represents a Tokio TLS encrypted TCP connection
#[cfg(feature = "tokio-tls-comp")]
#[cfg(feature = "tokio-native-tls-comp")]
TcpTls(TlsStream<TcpStreamTokio>),
/// Represents a Tokio Unix connection.
#[cfg(unix)]
Expand All @@ -43,7 +43,7 @@ impl AsyncWrite for Tokio {
) -> Poll<io::Result<usize>> {
match &mut *self {
Tokio::Tcp(r) => Pin::new(r).poll_write(cx, buf),
#[cfg(feature = "tokio-tls-comp")]
#[cfg(feature = "tokio-native-tls-comp")]
Tokio::TcpTls(r) => Pin::new(r).poll_write(cx, buf),
#[cfg(unix)]
Tokio::Unix(r) => Pin::new(r).poll_write(cx, buf),
Expand All @@ -53,7 +53,7 @@ impl AsyncWrite for Tokio {
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<io::Result<()>> {
match &mut *self {
Tokio::Tcp(r) => Pin::new(r).poll_flush(cx),
#[cfg(feature = "tokio-tls-comp")]
#[cfg(feature = "tokio-native-tls-comp")]
Tokio::TcpTls(r) => Pin::new(r).poll_flush(cx),
#[cfg(unix)]
Tokio::Unix(r) => Pin::new(r).poll_flush(cx),
Expand All @@ -63,7 +63,7 @@ impl AsyncWrite for Tokio {
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<io::Result<()>> {
match &mut *self {
Tokio::Tcp(r) => Pin::new(r).poll_shutdown(cx),
#[cfg(feature = "tokio-tls-comp")]
#[cfg(feature = "tokio-native-tls-comp")]
Tokio::TcpTls(r) => Pin::new(r).poll_shutdown(cx),
#[cfg(unix)]
Tokio::Unix(r) => Pin::new(r).poll_shutdown(cx),
Expand All @@ -75,11 +75,11 @@ impl AsyncRead for Tokio {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut task::Context,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
match &mut *self {
Tokio::Tcp(r) => Pin::new(r).poll_read(cx, buf),
#[cfg(feature = "tokio-tls-comp")]
#[cfg(feature = "tokio-native-tls-comp")]
Tokio::TcpTls(r) => Pin::new(r).poll_read(cx, buf),
#[cfg(unix)]
Tokio::Unix(r) => Pin::new(r).poll_read(cx, buf),
Expand All @@ -101,7 +101,7 @@ impl RedisRuntime for Tokio {
socket_addr: SocketAddr,
insecure: bool,
) -> RedisResult<Self> {
let tls_connector: tokio_tls::TlsConnector = if insecure {
let tls_connector: tokio_native_tls::TlsConnector = if insecure {
TlsConnector::builder()
.danger_accept_invalid_certs(true)
.danger_accept_invalid_hostnames(true)
Expand All @@ -122,20 +122,20 @@ impl RedisRuntime for Tokio {
Ok(UnixStreamTokio::connect(path).await.map(Tokio::Unix)?)
}

#[cfg(feature = "tokio-rt-core")]
#[cfg(feature = "tokio-dep")]
fn spawn(f: impl Future<Output = ()> + Send + 'static) {
tokio::spawn(f);
}

#[cfg(not(feature = "tokio-rt-core"))]
#[cfg(not(feature = "tokio-dep"))]
fn spawn(_: impl Future<Output = ()> + Send + 'static) {
unreachable!()
}

fn boxed(self) -> Pin<Box<dyn AsyncStream + Send + Sync>> {
match self {
Tokio::Tcp(x) => Box::pin(x),
#[cfg(feature = "tokio-tls-comp")]
#[cfg(feature = "tokio-native-tls-comp")]
Tokio::TcpTls(x) => Box::pin(x),
#[cfg(unix)]
Tokio::Unix(x) => Box::pin(x),
Expand Down
10 changes: 5 additions & 5 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,10 @@ impl Client {
}

/// Returns an async connection from the client.
#[cfg(any(feature = "tokio-rt-core", feature = "async-std-comp"))]
#[cfg(any(feature = "tokio-dep", feature = "async-std-comp"))]
#[cfg_attr(
docsrs,
doc(cfg(any(feature = "tokio-rt-core", feature = "async-std-comp")))
doc(cfg(any(feature = "tokio-dep", feature = "async-std-comp")))
)]
pub async fn get_multiplexed_async_connection(
&self,
Expand All @@ -128,7 +128,7 @@ impl Client {
///
/// A multiplexed connection can be cloned, allowing requests to be be sent concurrently
/// on the same underlying connection (tcp/unix socket).
#[cfg(feature = "tokio-rt-core")]
#[cfg(feature = "tokio-comp")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-comp")))]
pub async fn get_multiplexed_tokio_connection(
&self,
Expand All @@ -155,8 +155,8 @@ impl Client {
///
/// A multiplexed connection can be cloned, allowing requests to be be sent concurrently
/// on the same underlying connection (tcp/unix socket).
#[cfg(feature = "tokio-rt-core")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-rt-core")))]
#[cfg(feature = "tokio-dep")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-dep")))]
pub async fn create_multiplexed_tokio_connection(
&self,
) -> RedisResult<(
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
//! * `script`: enables script support (enabled by default)
//! * `r2d2`: enables r2d2 connection pool support (optional)
//! * `cluster`: enables redis cluster support (optional)
//! * `tokio-rt-core`: enables support for tokio-rt (optional)
//! * `tokio-dep`: enables support for tokio (optional)
//! * `connection-manager`: enables support for automatic reconnection (optional)
//!
//! ## Connection Parameters
Expand Down
2 changes: 1 addition & 1 deletion src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ mod aio_support {
where
R: AsyncRead + std::marker::Unpin,
{
let result = combine::decode_tokio_02!(*decoder, *read, value(), |input, _| {
let result = combine::decode_tokio_03!(*decoder, *read, value(), |input, _| {
combine::stream::easy::Stream::from(input)
});
match result {
Expand Down
8 changes: 4 additions & 4 deletions tests/support/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ use futures::Future;
use redis::Value;

pub fn current_thread_runtime() -> tokio::runtime::Runtime {
let mut builder = tokio::runtime::Builder::new();
let mut builder = tokio::runtime::Builder::new_current_thread();

#[cfg(feature = "aio")]
builder.enable_io();

builder.basic_scheduler().build().unwrap()
builder.build().unwrap()
}

pub fn block_on_all<F>(f: F) -> F::Output
Expand Down Expand Up @@ -308,14 +308,14 @@ impl TestContext {
self.server.stop();
}

#[cfg(feature = "tokio-rt-core")]
#[cfg(feature = "tokio-dep")]
pub fn multiplexed_async_connection(
&self,
) -> impl Future<Output = redis::RedisResult<redis::aio::MultiplexedConnection>> {
self.multiplexed_async_connection_tokio()
}

#[cfg(feature = "tokio-rt-core")]
#[cfg(feature = "tokio-dep")]
pub fn multiplexed_async_connection_tokio(
&self,
) -> impl Future<Output = redis::RedisResult<redis::aio::MultiplexedConnection>> {
Expand Down

0 comments on commit 9fee882

Please sign in to comment.