Skip to content

Commit

Permalink
chore(tests): Reduce futures crates (#1448)
Browse files Browse the repository at this point in the history
  • Loading branch information
tottoto authored Aug 3, 2023
1 parent 60d776b commit 23602b4
Show file tree
Hide file tree
Showing 18 changed files with 53 additions and 50 deletions.
3 changes: 2 additions & 1 deletion tests/compression/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ version = "0.1.0"

[dependencies]
bytes = "1"
futures = "0.3"
futures-core = "0.3"
http = "0.2"
http-body = "0.4"
hyper = "0.14.3"
pin-project = "1.0"
prost = "0.11"
tokio = {version = "1.0", features = ["macros", "rt-multi-thread", "net"]}
tokio-stream = "0.1"
tonic = {path = "../../tonic", features = ["gzip"]}
tower = {version = "0.4", features = []}
tower-http = {version = "0.4", features = ["map-response-body", "map-request-body"]}
Expand Down
4 changes: 2 additions & 2 deletions tests/compression/src/bidirectional_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async fn client_enabled_server_enabled() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
.unwrap();
}
Expand All @@ -48,7 +48,7 @@ async fn client_enabled_server_enabled() {
.accept_compressed(CompressionEncoding::Gzip);

let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec();
let stream = futures::stream::iter(vec![SomeData { data: data.clone() }, SomeData { data }]);
let stream = tokio_stream::iter(vec![SomeData { data: data.clone() }, SomeData { data }]);
let req = Request::new(stream);

let res = client
Expand Down
16 changes: 8 additions & 8 deletions tests/compression/src/client_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async fn client_enabled_server_enabled() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
.unwrap();
}
Expand All @@ -39,7 +39,7 @@ async fn client_enabled_server_enabled() {
.send_compressed(CompressionEncoding::Gzip);

let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec();
let stream = futures::stream::iter(vec![SomeData { data: data.clone() }, SomeData { data }]);
let stream = tokio_stream::iter(vec![SomeData { data: data.clone() }, SomeData { data }]);
let req = Request::new(Box::pin(stream));

client.compress_input_client_stream(req).await.unwrap();
Expand Down Expand Up @@ -75,7 +75,7 @@ async fn client_disabled_server_enabled() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
.unwrap();
}
Expand All @@ -84,7 +84,7 @@ async fn client_disabled_server_enabled() {
let mut client = test_client::TestClient::new(mock_io_channel(client).await);

let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec();
let stream = futures::stream::iter(vec![SomeData { data: data.clone() }, SomeData { data }]);
let stream = tokio_stream::iter(vec![SomeData { data: data.clone() }, SomeData { data }]);
let req = Request::new(Box::pin(stream));

client.compress_input_client_stream(req).await.unwrap();
Expand All @@ -102,7 +102,7 @@ async fn client_enabled_server_disabled() {
tokio::spawn(async move {
Server::builder()
.add_service(svc)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
.unwrap();
});
Expand All @@ -111,7 +111,7 @@ async fn client_enabled_server_disabled() {
.send_compressed(CompressionEncoding::Gzip);

let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec();
let stream = futures::stream::iter(vec![SomeData { data: data.clone() }, SomeData { data }]);
let stream = tokio_stream::iter(vec![SomeData { data: data.clone() }, SomeData { data }]);
let req = Request::new(Box::pin(stream));

let status = client.compress_input_client_stream(req).await.unwrap_err();
Expand Down Expand Up @@ -147,7 +147,7 @@ async fn compressing_response_from_client_stream() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
.unwrap();
}
Expand All @@ -156,7 +156,7 @@ async fn compressing_response_from_client_stream() {
let mut client = test_client::TestClient::new(mock_io_channel(client).await)
.accept_compressed(CompressionEncoding::Gzip);

let stream = futures::stream::iter(vec![]);
let stream = tokio_stream::iter(vec![]);
let req = Request::new(Box::pin(stream));

let res = client.compress_output_client_stream(req).await.unwrap();
Expand Down
6 changes: 3 additions & 3 deletions tests/compression/src/compressing_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async fn client_enabled_server_enabled() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
.unwrap();
}
Expand Down Expand Up @@ -61,7 +61,7 @@ async fn client_enabled_server_disabled() {
tokio::spawn(async move {
Server::builder()
.add_service(svc)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
.unwrap();
});
Expand Down Expand Up @@ -99,7 +99,7 @@ async fn client_mark_compressed_without_header_server_enabled() {
async move {
Server::builder()
.add_service(svc)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
.unwrap();
}
Expand Down
16 changes: 8 additions & 8 deletions tests/compression/src/compressing_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ async fn client_enabled_server_enabled() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
.unwrap();
}
Expand Down Expand Up @@ -94,7 +94,7 @@ async fn client_enabled_server_disabled() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
.unwrap();
}
Expand Down Expand Up @@ -160,7 +160,7 @@ async fn client_disabled() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
.unwrap();
}
Expand Down Expand Up @@ -198,7 +198,7 @@ async fn server_replying_with_unsupported_encoding() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
.unwrap();
});
Expand Down Expand Up @@ -240,7 +240,7 @@ async fn disabling_compression_on_single_response() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
.unwrap();
}
Expand Down Expand Up @@ -281,7 +281,7 @@ async fn disabling_compression_on_response_but_keeping_compression_on_stream() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
.unwrap();
}
Expand Down Expand Up @@ -337,7 +337,7 @@ async fn disabling_compression_on_response_from_client_stream() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
.unwrap();
}
Expand All @@ -346,7 +346,7 @@ async fn disabling_compression_on_response_from_client_stream() {
let mut client = test_client::TestClient::new(mock_io_channel(client).await)
.accept_compressed(CompressionEncoding::Gzip);

let stream = futures::stream::iter(vec![]);
let stream = tokio_stream::iter(vec![]);
let req = Request::new(Box::pin(stream));

let res = client.compress_output_client_stream(req).await.unwrap();
Expand Down
6 changes: 3 additions & 3 deletions tests/compression/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

use self::util::*;
use crate::util::mock_io_channel;
use futures::{Stream, StreamExt};
use std::{
pin::Pin,
sync::{
Expand All @@ -11,6 +10,7 @@ use std::{
},
};
use tokio::net::TcpListener;
use tokio_stream::{Stream, StreamExt};
use tonic::{
transport::{Channel, Endpoint, Server, Uri},
Request, Response, Status, Streaming,
Expand Down Expand Up @@ -67,7 +67,7 @@ impl test_server::Test for Svc {
_req: Request<()>,
) -> Result<Response<Self::CompressOutputServerStreamStream>, Status> {
let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec();
let stream = futures::stream::repeat(SomeData { data })
let stream = tokio_stream::iter(std::iter::repeat(SomeData { data }))
.take(2)
.map(Ok::<_, Status>);
Ok(self.prepare_response(Response::new(Box::pin(stream))))
Expand Down Expand Up @@ -113,7 +113,7 @@ impl test_server::Test for Svc {
}

let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec();
let stream = futures::stream::repeat(SomeData { data })
let stream = tokio_stream::iter(std::iter::repeat(SomeData { data }))
.take(2)
.map(Ok::<_, Status>);
Ok(self.prepare_response(Response::new(Box::pin(stream))))
Expand Down
6 changes: 3 additions & 3 deletions tests/compression/src/server_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ async fn client_enabled_server_enabled() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
.unwrap();
}
Expand Down Expand Up @@ -80,7 +80,7 @@ async fn client_disabled_server_enabled() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
.unwrap();
}
Expand Down Expand Up @@ -125,7 +125,7 @@ async fn client_enabled_server_disabled() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
.unwrap();
}
Expand Down
2 changes: 1 addition & 1 deletion tests/compression/src/util.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::*;
use bytes::Bytes;
use futures::ready;
use futures_core::ready;
use http_body::Body;
use pin_project::pin_project;
use std::{
Expand Down
5 changes: 2 additions & 3 deletions tests/integration_tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,14 @@ version = "0.1.0"

[dependencies]
bytes = "1.0"
futures-util = "0.3"
futures-util = {version="0.3", default-features =false}
prost = "0.11"
tokio = {version = "1.0", features = ["macros", "rt-multi-thread", "net"]}
tokio = {version = "1.0", features = ["macros", "rt-multi-thread", "net", "sync"]}
tonic = {path = "../../tonic"}
tracing-subscriber = {version = "0.3"}

[dev-dependencies]
async-stream = "0.3"
futures = "0.3"
http = "0.2"
http-body = "0.4"
hyper = "0.14"
Expand Down
3 changes: 2 additions & 1 deletion tests/integration_tests/tests/client_layer.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::time::Duration;

use futures::{channel::oneshot, FutureExt};
use futures_util::FutureExt;
use http::{header::HeaderName, HeaderValue};
use integration_tests::pb::{test_client::TestClient, test_server, Input, Output};
use tokio::sync::oneshot;
use tonic::{
transport::{Endpoint, Server},
Request, Response, Status,
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/tests/extensions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ where
{
type Response = S::Response;
type Error = S::Error;
type Future = futures::future::BoxFuture<'static, Result<Self::Response, Self::Error>>;
type Future = futures_util::future::BoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
Expand Down
3 changes: 2 additions & 1 deletion tests/integration_tests/tests/interceptor.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::time::Duration;

use futures::{channel::oneshot, FutureExt};
use futures_util::FutureExt;
use integration_tests::pb::{test_client::TestClient, test_server, Input, Output};
use tokio::sync::oneshot;
use tonic::{
transport::{Endpoint, Server},
GrpcMethod, Request, Response, Status,
Expand Down
8 changes: 4 additions & 4 deletions tests/integration_tests/tests/max_message_size.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::pin::Pin;

use futures::{stream, Stream};
use integration_tests::{
pb::{test1_client, test1_server, Input1, Output1},
trace_init,
};
use tokio_stream::Stream;
use tonic::{
transport::{Endpoint, Server},
Code, Request, Response, Status,
Expand Down Expand Up @@ -137,7 +137,7 @@ async fn response_stream_limit() {
let blob = Output1 {
buf: vec![0; 6877902],
};
let stream = stream::iter(vec![Ok(blob.clone()), Ok(blob.clone())]);
let stream = tokio_stream::iter(vec![Ok(blob.clone()), Ok(blob.clone())]);

Ok(Response::new(Box::pin(stream)))
}
Expand All @@ -148,7 +148,7 @@ async fn response_stream_limit() {
tokio::spawn(async move {
Server::builder()
.add_service(svc)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
.unwrap();
});
Expand Down Expand Up @@ -317,7 +317,7 @@ async fn max_message_run(case: &TestCase) -> Result<(), Status> {
tokio::spawn(async move {
Server::builder()
.add_service(svc)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
.unwrap();
});
Expand Down
3 changes: 1 addition & 2 deletions tests/integration_tests/tests/origin.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use futures::future::BoxFuture;
use futures_util::FutureExt;
use futures_util::{future::BoxFuture, FutureExt};
use integration_tests::pb::test_client;
use integration_tests::pb::{test_server, Input, Output};
use std::task::Context;
Expand Down
7 changes: 4 additions & 3 deletions tests/integration_tests/tests/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,9 @@ async fn status_with_metadata() {
jh.await.unwrap();
}

type Stream<T> =
std::pin::Pin<Box<dyn futures::Stream<Item = std::result::Result<T, Status>> + Send + 'static>>;
type Stream<T> = std::pin::Pin<
Box<dyn tokio_stream::Stream<Item = std::result::Result<T, Status>> + Send + 'static>,
>;

#[tokio::test]
async fn status_from_server_stream() {
Expand All @@ -142,7 +143,7 @@ async fn status_from_server_stream() {
&self,
_: Request<InputStream>,
) -> Result<Response<Self::StreamCallStream>, Status> {
let s = futures::stream::iter(vec![
let s = tokio_stream::iter(vec![
Err::<OutputStream, _>(Status::unavailable("foo")),
Err::<OutputStream, _>(Status::unavailable("bar")),
]);
Expand Down
Loading

0 comments on commit 23602b4

Please sign in to comment.