-
Notifications
You must be signed in to change notification settings - Fork 192
/
stalled_stream_performance.rs
115 lines (108 loc) · 4.17 KB
/
stalled_stream_performance.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
#![cfg(all(feature = "client", feature = "test-util"))]
use aws_smithy_async::rt::sleep::TokioSleep;
use aws_smithy_async::time::{SystemTimeSource, TimeSource};
use aws_smithy_runtime::client::http::body::minimum_throughput::MinimumThroughputDownloadBody;
use aws_smithy_runtime_api::client::stalled_stream_protection::StalledStreamProtectionConfig;
use aws_smithy_types::body::SdkBody;
use aws_smithy_types::byte_stream::ByteStream;
use bytes::{BufMut, Bytes, BytesMut};
use hyper_0_14::server::conn::AddrStream;
use hyper_0_14::service::{make_service_fn, service_fn, Service};
use hyper_0_14::Server;
use std::convert::Infallible;
use std::net::TcpListener;
use std::time::Duration;
fn make_block(sz: usize) -> Bytes {
let mut b = BytesMut::with_capacity(sz);
b.put_bytes(1, sz);
b.freeze()
}
// TODO(postGA): convert this to an actual benchmark
// This test evaluates streaming 1GB of data over the loopback with and without the body wrapper
// enabled. After optimizations, the body wrapper seems to make minimal differences
// NOTE: make sure you run this test in release mode to get a sensible result
#[tokio::test]
#[ignore]
async fn stalled_stream_performance() {
// 1GB
let data_size = 1_000_000_000;
// observed block size during actual HTTP requests
let block_size = 16384;
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let make_service = make_service_fn(move |_connection: &AddrStream| async move {
Ok::<_, Infallible>(service_fn(
move |_: http_02x::Request<hyper_0_14::Body>| async move {
let (mut sender, body) = hyper_0_14::Body::channel();
tokio::task::spawn(async move {
for _i in 0..(data_size / block_size) {
sender
.send_data(make_block(block_size))
.await
.expect("failed to write data");
}
});
Ok::<_, Infallible>(http_02x::Response::new(body))
},
))
});
let addr = format!("http://localhost:{}", listener.local_addr().unwrap().port());
let server = Server::from_tcp(listener).unwrap().serve(make_service);
tokio::spawn(server);
let mut no_wrapping = vec![];
let mut wrapping = vec![];
let runs = 10;
for _i in 0..runs {
no_wrapping.push(make_request(&addr, false).await);
wrapping.push(make_request(&addr, true).await);
}
println!(
"Average w/ wrapping: {}",
wrapping.iter().map(|it| it.as_millis() as f64).sum::<f64>() / runs as f64
);
println!(
"Average w/o wrapping: {}",
no_wrapping
.iter()
.map(|it: &Duration| it.as_millis() as f64)
.sum::<f64>()
/ runs as f64
)
}
async fn make_request(address: &str, wrap_body: bool) -> Duration {
let mut client = hyper_0_14::Client::new();
let req = ::http_02x::Request::builder()
.uri(address)
.body(hyper_0_14::Body::empty())
.unwrap();
let resp = client.call(req).await;
let body = resp.unwrap().into_body();
let mut body = SdkBody::from_body_0_4(body);
if wrap_body {
body = body.map_preserve_contents(|body| {
let time_source = SystemTimeSource::new();
let sleep = TokioSleep::new();
let opts = StalledStreamProtectionConfig::enabled().build();
let mtb = MinimumThroughputDownloadBody::new(time_source, sleep, body, opts.into());
SdkBody::from_body_0_4(mtb)
});
}
let sdk_body = ByteStream::new(body);
let ts = SystemTimeSource::new();
let start = ts.now();
// this a slow way to count total bytes, but we need to actually read the bytes into segments
// otherwise some of our code seems to be optimized away
let total_bytes = sdk_body
.collect()
.await
.unwrap()
.into_segments()
.map(|seg| seg.len())
.sum::<usize>();
println!("total: {:?}", total_bytes);
let end = ts.now();
end.duration_since(start).unwrap()
}