Skip to content

Commit

Permalink
Revert "Overhaul stalled stream protection and add upload support (#3485
Browse files Browse the repository at this point in the history
)" (#3534)

The stalled stream protection changes are triggering a semver hazard
that will break the release. Need to revert it temporarily to fix this
issue.

----

_By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice._
  • Loading branch information
jdisanti authored Mar 28, 2024
1 parent 327e31e commit 498dc5f
Show file tree
Hide file tree
Showing 20 changed files with 600 additions and 2,006 deletions.
42 changes: 0 additions & 42 deletions CHANGELOG.next.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,45 +22,3 @@ message = "Make `BehaviorVersion` be future-proof by disallowing it to be constr
references = ["aws-sdk-rust#1111", "smithy-rs#3513"]
meta = { "breaking" = true, "tada" = false, "bug" = true, "target" = "client" }
author = "Ten0"

[[smithy-rs]]
message = """
Stalled stream protection now supports request upload streams. It is currently off by default, but will be enabled by default in a future release. To enable it now, you can do the following:
```rust
let config = my_service::Config::builder()
.stalled_stream_protection(StalledStreamProtectionConfig::enabled().build())
// ...
.build();
```
"""
references = ["smithy-rs#3485"]
meta = { "breaking" = false, "tada" = true, "bug" = false }
authors = ["jdisanti"]

[[aws-sdk-rust]]
message = """
Stalled stream protection now supports request upload streams. It is currently off by default, but will be enabled by default in a future release. To enable it now, you can do the following:
```rust
let config = aws_config::defaults(BehaviorVersion::latest())
.stalled_stream_protection(StalledStreamProtectionConfig::enabled().build())
.load()
.await;
```
"""
references = ["smithy-rs#3485"]
meta = { "breaking" = false, "tada" = true, "bug" = false }
author = "jdisanti"

[[smithy-rs]]
message = "Stalled stream protection on downloads will now only trigger if the upstream source is too slow. Previously, stalled stream protection could be erroneously triggered if the user was slowly consuming the stream slower than the minimum speed limit."
references = ["smithy-rs#3485"]
meta = { "breaking" = false, "tada" = false, "bug" = true }
authors = ["jdisanti"]

[[aws-sdk-rust]]
message = "Stalled stream protection on downloads will now only trigger if the upstream source is too slow. Previously, stalled stream protection could be erroneously triggered if the user was slowly consuming the stream slower than the minimum speed limit."
references = ["smithy-rs#3485"]
meta = { "breaking" = false, "tada" = false, "bug" = true }
author = "jdisanti"
3 changes: 0 additions & 3 deletions aws/sdk/integration-tests/s3/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,3 @@ tracing-subscriber = { version = "0.3.15", features = ["env-filter", "json"] }
# If you're writing a test with this, take heed! `no-env-filter` means you'll be capturing
# logs from everything that speaks, so be specific with your asserts.
tracing-test = { version = "0.2.4", features = ["no-env-filter"] }

[dependencies]
pin-project-lite = "0.2.13"
89 changes: 0 additions & 89 deletions aws/sdk/integration-tests/s3/tests/body_size_hint.rs

This file was deleted.

125 changes: 38 additions & 87 deletions aws/sdk/integration-tests/s3/tests/stalled-stream-protection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,119 +4,58 @@
*/

use aws_credential_types::Credentials;
use aws_sdk_s3::{
config::{Region, StalledStreamProtectionConfig},
error::BoxError,
};
use aws_sdk_s3::{error::DisplayErrorContext, primitives::ByteStream};
use aws_sdk_s3::config::{Region, StalledStreamProtectionConfig};
use aws_sdk_s3::primitives::ByteStream;
use aws_sdk_s3::{Client, Config};
use aws_smithy_runtime::{assert_str_contains, test_util::capture_test_logs::capture_test_logs};
use aws_smithy_types::body::SdkBody;
use bytes::{Bytes, BytesMut};
use http_body::Body;
use bytes::BytesMut;
use std::error::Error;
use std::future::Future;
use std::net::SocketAddr;
use std::time::Duration;
use std::{future::Future, task::Poll};
use std::{net::SocketAddr, pin::Pin, task::Context};
use tokio::{
net::{TcpListener, TcpStream},
time::sleep,
};
use tracing::debug;

enum SlowBodyState {
Wait(Pin<Box<dyn std::future::Future<Output = ()> + Send + Sync + 'static>>),
Send,
Taken,
}

struct SlowBody {
state: SlowBodyState,
}

impl SlowBody {
fn new() -> Self {
Self {
state: SlowBodyState::Send,
}
}
}

impl Body for SlowBody {
type Data = Bytes;
type Error = BoxError;

fn poll_data(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
loop {
let mut state = SlowBodyState::Taken;
std::mem::swap(&mut state, &mut self.state);
match state {
SlowBodyState::Wait(mut fut) => match fut.as_mut().poll(cx) {
Poll::Ready(_) => self.state = SlowBodyState::Send,
Poll::Pending => {
self.state = SlowBodyState::Wait(fut);
return Poll::Pending;
}
},
SlowBodyState::Send => {
self.state = SlowBodyState::Wait(Box::pin(sleep(Duration::from_micros(100))));
return Poll::Ready(Some(Ok(Bytes::from_static(
b"data_data_data_data_data_data_data_data_data_data_data_data_\
data_data_data_data_data_data_data_data_data_data_data_data_\
data_data_data_data_data_data_data_data_data_data_data_data_\
data_data_data_data_data_data_data_data_data_data_data_data_",
))));
}
SlowBodyState::Taken => unreachable!(),
}
}
}

fn poll_trailers(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
Poll::Ready(Ok(None))
}
}

// This test doesn't work because we can't count on `hyper` to poll the body,
// regardless of whether we schedule a wake. To make this functionality work,
// we'd have to integrate more closely with the orchestrator.
//
// I'll leave this test here because we do eventually want to support stalled
// stream protection for uploads.
#[ignore]
#[tokio::test]
async fn test_stalled_stream_protection_defaults_for_upload() {
let _logs = capture_test_logs();

// We spawn a faulty server that will stop all request processing after reading half of the request body.
// We spawn a faulty server that will close the connection after
// writing half of the response body.
let (server, server_addr) = start_faulty_upload_server().await;
let _ = tokio::spawn(server);

let conf = Config::builder()
.credentials_provider(Credentials::for_tests())
.region(Region::new("us-east-1"))
.endpoint_url(format!("http://{server_addr}"))
// TODO(https://github.com/smithy-lang/smithy-rs/issues/3510): make stalled stream protection enabled by default with BMV and remove this line
.stalled_stream_protection(StalledStreamProtectionConfig::enabled().build())
// .stalled_stream_protection(StalledStreamProtectionConfig::enabled().build())
.build();
let client = Client::from_conf(conf);

let err = client
.put_object()
.bucket("a-test-bucket")
.key("stalled-stream-test.txt")
.body(ByteStream::new(SdkBody::from_body_0_4(SlowBody::new())))
.body(ByteStream::from_static(b"Hello"))
.send()
.await
.expect_err("upload stream stalled out");

let err_msg = DisplayErrorContext(&err).to_string();
assert_str_contains!(
err_msg,
let err = err.source().expect("inner error exists");
assert_eq!(
err.to_string(),
"minimum throughput was specified at 1 B/s, but throughput of 0 B/s was observed"
);
}

async fn start_faulty_upload_server() -> (impl Future<Output = ()>, SocketAddr) {
use tokio::net::{TcpListener, TcpStream};
use tokio::time::sleep;

let listener = TcpListener::bind("0.0.0.0:0")
.await
.expect("socket is free");
Expand All @@ -126,7 +65,12 @@ async fn start_faulty_upload_server() -> (impl Future<Output = ()>, SocketAddr)
let mut buf = BytesMut::new();
let mut time_to_stall = false;

while !time_to_stall {
loop {
if time_to_stall {
debug!("faulty server has read partial request, now getting stuck");
break;
}

match socket.try_read_buf(&mut buf) {
Ok(0) => {
unreachable!(
Expand All @@ -135,7 +79,12 @@ async fn start_faulty_upload_server() -> (impl Future<Output = ()>, SocketAddr)
}
Ok(n) => {
debug!("read {n} bytes from the socket");

// Check to see if we've received some headers
if buf.len() >= 128 {
let s = String::from_utf8_lossy(&buf);
debug!("{s}");

time_to_stall = true;
}
}
Expand All @@ -149,7 +98,6 @@ async fn start_faulty_upload_server() -> (impl Future<Output = ()>, SocketAddr)
}
}

debug!("faulty server has read partial request, now getting stuck");
loop {
tokio::task::yield_now().await
}
Expand Down Expand Up @@ -281,11 +229,14 @@ async fn test_stalled_stream_protection_for_downloads_is_enabled_by_default() {
err.to_string(),
"minimum throughput was specified at 1 B/s, but throughput of 0 B/s was observed"
);
// the 1s check interval is included in the 5s grace period
assert_eq!(start.elapsed().as_secs(), 5);
// 1s check interval + 5s grace period
assert_eq!(start.elapsed().as_secs(), 6);
}

async fn start_faulty_download_server() -> (impl Future<Output = ()>, SocketAddr) {
use tokio::net::{TcpListener, TcpStream};
use tokio::time::sleep;

let listener = TcpListener::bind("0.0.0.0:0")
.await
.expect("socket is free");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,15 @@ class StalledStreamProtectionOperationCustomization(
is OperationSection.AdditionalInterceptors -> {
val stalledStreamProtectionModule = RuntimeType.smithyRuntime(rc).resolve("client::stalled_stream_protection")
section.registerInterceptor(rc, this) {
// Currently, only response bodies are protected/supported because
// we can't count on hyper to poll a request body on wake.
rustTemplate(
"""
#{StalledStreamProtectionInterceptor}::default()
#{StalledStreamProtectionInterceptor}::new(#{Kind}::ResponseBody)
""",
*preludeScope,
"StalledStreamProtectionInterceptor" to stalledStreamProtectionModule.resolve("StalledStreamProtectionInterceptor"),
"Kind" to stalledStreamProtectionModule.resolve("StalledStreamProtectionInterceptorKind"),
)
}
}
Expand Down
Loading

0 comments on commit 498dc5f

Please sign in to comment.