From 33cd698f7f21255ad3327c89e0d1f883554559f3 Mon Sep 17 00:00:00 2001 From: ysaito1001 Date: Thu, 28 Sep 2023 14:47:05 -0500 Subject: [PATCH] Remove `futures_core::stream::Stream` from `aws_smithy_http::byte_stream::ByteStream` (#2983) ## Motivation and Context Removes `futures_core::stream::Stream` from `aws_smithy_http::byte_stream::ByteStream` ## Description This PR is part of our ongoing effort, https://github.com/awslabs/smithy-rs/issues/2413. We remove the `futures_core::stream::Stream` trait from `aws_smithy_http::byte_stream::ByteStream`. To continue support existing places that relied upon `ByteStream` implementing the `Stream` trait, we provide explicit `.next`, `.try_next`, and `.size_hints` methods on that type. As part of it, a doc-hidden type `FuturesStreamCompatByteStream`, which implements `Stream`, has been added to `aws_smithy_http` to cater to those who need a type implementing `Stream` (see [discussion](https://github.com/awslabs/smithy-rs/pull/2910#discussion_r1317112233) why doc-hidden). Another place we need to update is codegen responsible for rendering stream payload serializer, and this affects the server. The regular server and the python server have slightly different rendering requirements, since the former uses `aws_smithy_http::byte_stream::ByteStream` (that does _not_ implement the `Stream` trait) and latter uses its own [ByteStream](https://github.com/awslabs/smithy-rs/blob/cb79a68e3c38d1e62d3980d5e7baedc1144bacc7/rust-runtime/aws-smithy-http-server-python/src/types.rs#L343) (that does implement `Stream`). We use `ServerHttpBoundProtocolCustomization` to handle the difference: `StreamPayloadSerializerCustomization` and `PythonServerStreamPayloadSerializerCustomization`. ## Testing No new behavior has been added, relied on the existing tests in CI. ## Checklist - [x] I have updated `CHANGELOG.next.toml` if I made changes to the smithy-rs codegen or runtime crates - [x] I have updated `CHANGELOG.next.toml` if I made changes to the AWS SDK, generated SDK code, or SDK runtime crates ---- _By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice._ --- CHANGELOG.next.toml | 12 ++ .../aws-inlineable/src/glacier_checksums.rs | 1 - .../rust/codegen/core/smithy/RuntimeType.kt | 3 +- .../protocols/PythonServerProtocolLoader.kt | 21 ++++ .../ServerHttpBoundProtocolGenerator.kt | 32 +++++- .../smithy/protocols/ServerProtocolLoader.kt | 55 ++++++++- .../smithy/protocols/ServerRestXmlFactory.kt | 10 +- .../src/types.rs | 1 - .../aws-smithy-http/src/byte_stream.rs | 108 ++++++++++-------- .../src/futures_stream_adapter.rs | 61 ++++++++++ rust-runtime/aws-smithy-http/src/lib.rs | 6 + 11 files changed, 254 insertions(+), 56 deletions(-) create mode 100644 rust-runtime/aws-smithy-http/src/futures_stream_adapter.rs diff --git a/CHANGELOG.next.toml b/CHANGELOG.next.toml index aa612327bd..07e4f95af0 100644 --- a/CHANGELOG.next.toml +++ b/CHANGELOG.next.toml @@ -229,3 +229,15 @@ message = "Add support for Sigv4A request signing. Sigv4a signing will be used a references = ["smithy-rs#1797"] meta = { "breaking" = true, "tada" = true, "bug" = false } author = "Velfi" + +[[aws-sdk-rust]] +message = "The `futures_core::stream::Stream` trait has been removed from [`ByteStream`](https://docs.rs/aws-smithy-http/latest/aws_smithy_http/byte_stream/struct.ByteStream.html). The methods mentioned in the [doc](https://docs.rs/aws-smithy-http/latest/aws_smithy_http/byte_stream/struct.ByteStream.html#getting-data-out-of-a-bytestream) will continue to be supported. Other stream operations that were previously available through the trait or its extension traits can be added later in a backward compatible manner." +references = ["smithy-rs#2983"] +meta = { "breaking" = true, "tada" = false, "bug" = false } +author = "ysaito1001" + +[[smithy-rs]] +message = "The `futures_core::stream::Stream` trait has been removed from [`ByteStream`](https://docs.rs/aws-smithy-http/latest/aws_smithy_http/byte_stream/struct.ByteStream.html). The methods mentioned in the [doc](https://docs.rs/aws-smithy-http/latest/aws_smithy_http/byte_stream/struct.ByteStream.html#getting-data-out-of-a-bytestream) will continue to be supported. Other stream operations that were previously available through the trait or its extension traits can be added later in a backward compatible manner." +references = ["smithy-rs#2983"] +meta = { "breaking" = true, "tada" = false, "bug" = false, "target" = "client" } +author = "ysaito1001" diff --git a/aws/rust-runtime/aws-inlineable/src/glacier_checksums.rs b/aws/rust-runtime/aws-inlineable/src/glacier_checksums.rs index 18f1d9219e..bf95910e00 100644 --- a/aws/rust-runtime/aws-inlineable/src/glacier_checksums.rs +++ b/aws/rust-runtime/aws-inlineable/src/glacier_checksums.rs @@ -14,7 +14,6 @@ use bytes::Buf; use bytes_utils::SegmentedBuf; use http::header::HeaderName; use ring::digest::{Context, Digest, SHA256}; -use tokio_stream::StreamExt; const TREE_HASH_HEADER: &str = "x-amz-sha256-tree-hash"; const X_AMZ_CONTENT_SHA256: &str = "x-amz-content-sha256"; diff --git a/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/RuntimeType.kt b/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/RuntimeType.kt index c58596ad94..f7e463b0a7 100644 --- a/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/RuntimeType.kt +++ b/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/RuntimeType.kt @@ -407,9 +407,10 @@ data class RuntimeType(val path: String, val dependency: RustDependency? = null) fun retryErrorKind(runtimeConfig: RuntimeConfig) = smithyTypes(runtimeConfig).resolve("retry::ErrorKind") fun eventStreamReceiver(runtimeConfig: RuntimeConfig): RuntimeType = smithyHttp(runtimeConfig).resolve("event_stream::Receiver") - fun eventStreamSender(runtimeConfig: RuntimeConfig): RuntimeType = smithyHttp(runtimeConfig).resolve("event_stream::EventStreamSender") + fun futuresStreamCompatByteStream(runtimeConfig: RuntimeConfig): RuntimeType = + smithyHttp(runtimeConfig).resolve("futures_stream_adapter::FuturesStreamCompatByteStream") fun errorMetadata(runtimeConfig: RuntimeConfig) = smithyTypes(runtimeConfig).resolve("error::ErrorMetadata") fun errorMetadataBuilder(runtimeConfig: RuntimeConfig) = diff --git a/codegen-server/python/src/main/kotlin/software/amazon/smithy/rust/codegen/server/python/smithy/protocols/PythonServerProtocolLoader.kt b/codegen-server/python/src/main/kotlin/software/amazon/smithy/rust/codegen/server/python/smithy/protocols/PythonServerProtocolLoader.kt index f84d35e7cb..dbfa9e029d 100644 --- a/codegen-server/python/src/main/kotlin/software/amazon/smithy/rust/codegen/server/python/smithy/protocols/PythonServerProtocolLoader.kt +++ b/codegen-server/python/src/main/kotlin/software/amazon/smithy/rust/codegen/server/python/smithy/protocols/PythonServerProtocolLoader.kt @@ -61,6 +61,7 @@ class PythonServerAfterDeserializedMemberServerHttpBoundCustomization() : is ServerHttpBoundProtocolSection.AfterTimestampDeserializedMember -> writable { rust(".into()") } + else -> emptySection } } @@ -78,6 +79,23 @@ class PythonServerAfterDeserializedMemberHttpBindingCustomization(private val ru } } +/** + * Customization class used to determine how serialized stream payload should be rendered for the Python server. + * + * In this customization, we do not need to wrap the payload in a new-type wrapper to enable the + * `futures_core::stream::Stream` trait since the payload in question has a type + * `aws_smithy_http_server_python::types::ByteStream` which already implements the `Stream` trait. + */ +class PythonServerStreamPayloadSerializerCustomization() : ServerHttpBoundProtocolCustomization() { + override fun section(section: ServerHttpBoundProtocolSection): Writable = when (section) { + is ServerHttpBoundProtocolSection.WrapStreamPayload -> writable { + section.params.payloadGenerator.generatePayload(this, section.params.shapeName, section.params.shape) + } + + else -> emptySection + } +} + class PythonServerProtocolLoader( private val supportedProtocols: ProtocolMap, ) : ProtocolLoader(supportedProtocols) { @@ -91,6 +109,7 @@ class PythonServerProtocolLoader( ), additionalServerHttpBoundProtocolCustomizations = listOf( PythonServerAfterDeserializedMemberServerHttpBoundCustomization(), + PythonServerStreamPayloadSerializerCustomization(), ), additionalHttpBindingCustomizations = listOf( PythonServerAfterDeserializedMemberHttpBindingCustomization(runtimeConfig), @@ -103,6 +122,7 @@ class PythonServerProtocolLoader( ), additionalServerHttpBoundProtocolCustomizations = listOf( PythonServerAfterDeserializedMemberServerHttpBoundCustomization(), + PythonServerStreamPayloadSerializerCustomization(), ), additionalHttpBindingCustomizations = listOf( PythonServerAfterDeserializedMemberHttpBindingCustomization(runtimeConfig), @@ -115,6 +135,7 @@ class PythonServerProtocolLoader( ), additionalServerHttpBoundProtocolCustomizations = listOf( PythonServerAfterDeserializedMemberServerHttpBoundCustomization(), + PythonServerStreamPayloadSerializerCustomization(), ), additionalHttpBindingCustomizations = listOf( PythonServerAfterDeserializedMemberHttpBindingCustomization(runtimeConfig), diff --git a/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/protocols/ServerHttpBoundProtocolGenerator.kt b/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/protocols/ServerHttpBoundProtocolGenerator.kt index c579d64415..11a9a9558b 100644 --- a/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/protocols/ServerHttpBoundProtocolGenerator.kt +++ b/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/protocols/ServerHttpBoundProtocolGenerator.kt @@ -81,11 +81,28 @@ import software.amazon.smithy.rust.codegen.server.smithy.generators.protocol.Ser import software.amazon.smithy.rust.codegen.server.smithy.generators.serverBuilderSymbol import java.util.logging.Logger +data class StreamPayloadSerializerParams( + val codegenContext: ServerCodegenContext, + val payloadGenerator: ServerHttpBoundProtocolPayloadGenerator, + val shapeName: String, + val shape: OperationShape, +) + /** * Class describing a ServerHttpBoundProtocol section that can be used in a customization. */ sealed class ServerHttpBoundProtocolSection(name: String) : Section(name) { data class AfterTimestampDeserializedMember(val shape: MemberShape) : ServerHttpBoundProtocolSection("AfterTimestampDeserializedMember") + + /** + * Represent a section for rendering the serialized stream payload. + * + * If the payload does not implement the `futures_core::stream::Stream`, which is the case for + * `aws_smithy_http::byte_stream::ByteStream`, the section needs to be overridden and renders a new-type wrapper + * around the payload to enable the `Stream` trait. + */ + data class WrapStreamPayload(val params: StreamPayloadSerializerParams) : + ServerHttpBoundProtocolSection("WrapStreamPayload") } /** @@ -540,7 +557,18 @@ class ServerHttpBoundProtocolTraitImplGenerator( operationShape.outputShape(model).findStreamingMember(model)?.let { val payloadGenerator = ServerHttpBoundProtocolPayloadGenerator(codegenContext, protocol) withBlockTemplate("let body = #{SmithyHttpServer}::body::boxed(#{SmithyHttpServer}::body::Body::wrap_stream(", "));", *codegenScope) { - payloadGenerator.generatePayload(this, "output", operationShape) + for (customization in customizations) { + customization.section( + ServerHttpBoundProtocolSection.WrapStreamPayload( + StreamPayloadSerializerParams( + codegenContext, + payloadGenerator, + "output", + operationShape, + ), + ), + )(this) + } } } ?: run { val payloadGenerator = ServerHttpBoundProtocolPayloadGenerator(codegenContext, protocol) @@ -707,7 +735,7 @@ class ServerHttpBoundProtocolTraitImplGenerator( rustTemplate( """ #{SmithyHttpServer}::protocol::content_type_header_classifier( - &parts.headers, + &parts.headers, Some("$expectedRequestContentType"), )?; input = #{parser}(bytes.as_ref(), input)?; diff --git a/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/protocols/ServerProtocolLoader.kt b/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/protocols/ServerProtocolLoader.kt index e52d9e3a3b..92dde3b2f6 100644 --- a/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/protocols/ServerProtocolLoader.kt +++ b/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/protocols/ServerProtocolLoader.kt @@ -9,21 +9,68 @@ import software.amazon.smithy.aws.traits.protocols.AwsJson1_0Trait import software.amazon.smithy.aws.traits.protocols.AwsJson1_1Trait import software.amazon.smithy.aws.traits.protocols.RestJson1Trait import software.amazon.smithy.aws.traits.protocols.RestXmlTrait +import software.amazon.smithy.rust.codegen.core.rustlang.Writable +import software.amazon.smithy.rust.codegen.core.rustlang.withBlockTemplate +import software.amazon.smithy.rust.codegen.core.rustlang.writable +import software.amazon.smithy.rust.codegen.core.smithy.RuntimeType import software.amazon.smithy.rust.codegen.core.smithy.protocols.AwsJsonVersion import software.amazon.smithy.rust.codegen.core.smithy.protocols.ProtocolLoader import software.amazon.smithy.rust.codegen.core.smithy.protocols.ProtocolMap +import software.amazon.smithy.rust.codegen.core.util.isOutputEventStream import software.amazon.smithy.rust.codegen.server.smithy.ServerCodegenContext import software.amazon.smithy.rust.codegen.server.smithy.generators.protocol.ServerProtocolGenerator +class StreamPayloadSerializerCustomization() : ServerHttpBoundProtocolCustomization() { + override fun section(section: ServerHttpBoundProtocolSection): Writable = when (section) { + is ServerHttpBoundProtocolSection.WrapStreamPayload -> writable { + if (section.params.shape.isOutputEventStream(section.params.codegenContext.model)) { + // Event stream payload, of type `aws_smithy_http::event_stream::MessageStreamAdapter`, already + // implements the `Stream` trait, so no need to wrap it in the new-type. + section.params.payloadGenerator.generatePayload(this, section.params.shapeName, section.params.shape) + } else { + // Otherwise, the stream payload is `aws_smithy_http::byte_stream::ByteStream`. We wrap it in the + // new-type to enable the `Stream` trait. + withBlockTemplate( + "#{FuturesStreamCompatByteStream}::new(", + ")", + "FuturesStreamCompatByteStream" to RuntimeType.futuresStreamCompatByteStream(section.params.codegenContext.runtimeConfig), + ) { + section.params.payloadGenerator.generatePayload( + this, + section.params.shapeName, + section.params.shape, + ) + } + } + } + + else -> emptySection + } +} + class ServerProtocolLoader(supportedProtocols: ProtocolMap) : ProtocolLoader(supportedProtocols) { companion object { val DefaultProtocols = mapOf( - RestJson1Trait.ID to ServerRestJsonFactory(), - RestXmlTrait.ID to ServerRestXmlFactory(), - AwsJson1_0Trait.ID to ServerAwsJsonFactory(AwsJsonVersion.Json10), - AwsJson1_1Trait.ID to ServerAwsJsonFactory(AwsJsonVersion.Json11), + RestJson1Trait.ID to ServerRestJsonFactory( + additionalServerHttpBoundProtocolCustomizations = listOf( + StreamPayloadSerializerCustomization(), + ), + ), + RestXmlTrait.ID to ServerRestXmlFactory( + additionalServerHttpBoundProtocolCustomizations = listOf( + StreamPayloadSerializerCustomization(), + ), + ), + AwsJson1_0Trait.ID to ServerAwsJsonFactory( + AwsJsonVersion.Json10, + additionalServerHttpBoundProtocolCustomizations = listOf(StreamPayloadSerializerCustomization()), + ), + AwsJson1_1Trait.ID to ServerAwsJsonFactory( + AwsJsonVersion.Json11, + additionalServerHttpBoundProtocolCustomizations = listOf(StreamPayloadSerializerCustomization()), + ), ) } } diff --git a/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/protocols/ServerRestXmlFactory.kt b/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/protocols/ServerRestXmlFactory.kt index f5b3be454f..9207c56046 100644 --- a/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/protocols/ServerRestXmlFactory.kt +++ b/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/protocols/ServerRestXmlFactory.kt @@ -15,11 +15,17 @@ import software.amazon.smithy.rust.codegen.server.smithy.generators.protocol.Ser * RestXml server-side protocol factory. This factory creates the [ServerHttpProtocolGenerator] * with RestXml specific configurations. */ -class ServerRestXmlFactory : ProtocolGeneratorFactory { +class ServerRestXmlFactory( + private val additionalServerHttpBoundProtocolCustomizations: List = listOf(), +) : ProtocolGeneratorFactory { override fun protocol(codegenContext: ServerCodegenContext): Protocol = ServerRestXmlProtocol(codegenContext) override fun buildProtocolGenerator(codegenContext: ServerCodegenContext): ServerHttpBoundProtocolGenerator = - ServerHttpBoundProtocolGenerator(codegenContext, ServerRestXmlProtocol(codegenContext)) + ServerHttpBoundProtocolGenerator( + codegenContext, + ServerRestXmlProtocol(codegenContext), + additionalServerHttpBoundProtocolCustomizations, + ) override fun support(): ProtocolSupport { return ProtocolSupport( diff --git a/rust-runtime/aws-smithy-http-server-python/src/types.rs b/rust-runtime/aws-smithy-http-server-python/src/types.rs index a2fa308512..a274efe086 100644 --- a/rust-runtime/aws-smithy-http-server-python/src/types.rs +++ b/rust-runtime/aws-smithy-http-server-python/src/types.rs @@ -31,7 +31,6 @@ use pyo3::{ prelude::*, }; use tokio::{runtime::Handle, sync::Mutex}; -use tokio_stream::StreamExt; use crate::PyError; diff --git a/rust-runtime/aws-smithy-http/src/byte_stream.rs b/rust-runtime/aws-smithy-http/src/byte_stream.rs index e067018a9d..e648431749 100644 --- a/rust-runtime/aws-smithy-http/src/byte_stream.rs +++ b/rust-runtime/aws-smithy-http/src/byte_stream.rs @@ -48,7 +48,8 @@ //! //! ### Stream a ByteStream into a file //! The previous example is recommended in cases where loading the entire file into memory first is desirable. For extremely large -//! files, you may wish to stream the data directly to the file system, chunk by chunk. This is posible using the `futures::Stream` implementation. +//! files, you may wish to stream the data directly to the file system, chunk by chunk. +//! This is possible using the [`.next()`](crate::byte_stream::ByteStream::next) method. //! //! ```no_run //! use bytes::{Buf, Bytes}; @@ -128,6 +129,7 @@ use bytes::Bytes; use bytes_utils::SegmentedBuf; use http_body::Body; use pin_project_lite::pin_project; +use std::future::poll_fn; use std::io::IoSlice; use std::pin::Pin; use std::task::{Context, Poll}; @@ -166,9 +168,7 @@ pin_project! { /// println!("first chunk: {:?}", data.chunk()); /// } /// ``` - /// 2. Via [`impl Stream`](futures_core::Stream): - /// - /// _Note: An import of `StreamExt` is required to use `.try_next()`._ + /// 2. Via [`.next()`](crate::byte_stream::ByteStream::next) or [`.try_next()`](crate::byte_stream::ByteStream::try_next): /// /// For use-cases where holding the entire ByteStream in memory is unnecessary, use the /// `Stream` implementation: @@ -183,7 +183,6 @@ pin_project! { /// # } /// use aws_smithy_http::byte_stream::{ByteStream, AggregatedBytes, error::Error}; /// use aws_smithy_http::body::SdkBody; - /// use tokio_stream::StreamExt; /// /// async fn example() -> Result<(), Error> { /// let mut stream = ByteStream::from(vec![1, 2, 3, 4, 5, 99]); @@ -276,7 +275,7 @@ impl ByteStream { } } - /// Consumes the ByteStream, returning the wrapped SdkBody + /// Consume the `ByteStream`, returning the wrapped SdkBody. // Backwards compatibility note: Because SdkBody has a dyn variant, // we will always be able to implement this method, even if we stop using // SdkBody as the internal representation @@ -284,6 +283,37 @@ impl ByteStream { self.inner.body } + /// Return the next item in the `ByteStream`. + /// + /// There is also a sibling method [`try_next`](ByteStream::try_next), which returns a `Result, Error>` + /// instead of an `Option>`. + pub async fn next(&mut self) -> Option> { + Some(self.inner.next().await?.map_err(Error::streaming)) + } + + /// Attempt to pull out the next value of this stream, returning `None` if the stream is + /// exhausted. + pub fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + self.project().inner.poll_next(cx).map_err(Error::streaming) + } + + /// Consume and return the next item in the `ByteStream` or return an error if an error is + /// encountered. + /// + /// Similar to the [`next`](ByteStream::next) method, but this returns a `Result, Error>` rather than + /// an `Option>`, making for easy use with the `?` operator. + pub async fn try_next(&mut self) -> Result, Error> { + self.next().await.transpose() + } + + /// Return the bounds on the remaining length of the `ByteStream`. + pub fn size_hint(&self) -> (u64, Option) { + self.inner.size_hint() + } + /// Read all the data from this `ByteStream` into memory /// /// If an error in the underlying stream is encountered, `ByteStreamError` is returned. @@ -393,7 +423,9 @@ impl ByteStream { /// # } /// ``` pub fn into_async_read(self) -> impl tokio::io::AsyncRead { - tokio_util::io::StreamReader::new(self) + tokio_util::io::StreamReader::new( + crate::futures_stream_adapter::FuturesStreamCompatByteStream::new(self), + ) } /// Given a function to modify an [`SdkBody`], run it on the `SdkBody` inside this `Bytestream`. @@ -442,18 +474,6 @@ impl From for ByteStream { } } -impl futures_core::stream::Stream for ByteStream { - type Item = Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().inner.poll_next(cx).map_err(Error::streaming) - } - - fn size_hint(&self) -> (usize, Option) { - self.inner.size_hint() - } -} - /// Non-contiguous Binary Data Storage /// /// When data is read from the network, it is read in a sequence of chunks that are not in @@ -524,6 +544,25 @@ impl Inner { Self { body } } + async fn next(&mut self) -> Option> + where + Self: Unpin, + B: http_body::Body, + { + let mut me = Pin::new(self); + poll_fn(|cx| me.as_mut().poll_next(cx)).await + } + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> + where + B: http_body::Body, + { + self.project().body.poll_data(cx) + } + async fn collect(self) -> Result where B: http_body::Body, @@ -536,34 +575,13 @@ impl Inner { } Ok(AggregatedBytes(output)) } -} - -const SIZE_HINT_32_BIT_PANIC_MESSAGE: &str = r#" -You're running a 32-bit system and this stream's length is too large to be represented with a usize. -Please limit stream length to less than 4.294Gb or run this program on a 64-bit computer architecture. -"#; - -impl futures_core::stream::Stream for Inner -where - B: http_body::Body, -{ - type Item = Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().body.poll_data(cx) - } - fn size_hint(&self) -> (usize, Option) { + fn size_hint(&self) -> (u64, Option) + where + B: http_body::Body, + { let size_hint = http_body::Body::size_hint(&self.body); - let lower = size_hint.lower().try_into(); - let upper = size_hint.upper().map(|u| u.try_into()).transpose(); - - match (lower, upper) { - (Ok(lower), Ok(upper)) => (lower, upper), - (Err(_), _) | (_, Err(_)) => { - panic!("{}", SIZE_HINT_32_BIT_PANIC_MESSAGE) - } - } + (size_hint.lower(), size_hint.upper()) } } diff --git a/rust-runtime/aws-smithy-http/src/futures_stream_adapter.rs b/rust-runtime/aws-smithy-http/src/futures_stream_adapter.rs new file mode 100644 index 0000000000..0e718df493 --- /dev/null +++ b/rust-runtime/aws-smithy-http/src/futures_stream_adapter.rs @@ -0,0 +1,61 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +use crate::body::SdkBody; +use crate::byte_stream::error::Error as ByteStreamError; +use crate::byte_stream::ByteStream; +use bytes::Bytes; +use futures_core::stream::Stream; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// A new-type wrapper to enable the impl of the `futures_core::stream::Stream` trait +/// +/// [`ByteStream`] no longer implements `futures_core::stream::Stream` so we wrap it in the +/// new-type to enable the trait when it is required. +/// +/// This is meant to be used by codegen code, and users should not need to use it directly. +pub struct FuturesStreamCompatByteStream(ByteStream); + +impl FuturesStreamCompatByteStream { + /// Creates a new `FuturesStreamCompatByteStream` by wrapping `stream`. + pub fn new(stream: ByteStream) -> Self { + Self(stream) + } + + /// Returns [`SdkBody`] of the wrapped [`ByteStream`]. + pub fn into_inner(self) -> SdkBody { + self.0.into_inner() + } +} + +impl Stream for FuturesStreamCompatByteStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.0).poll_next(cx) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use futures_core::stream::Stream; + + fn check_compatible_with_hyper_wrap_stream(stream: S) -> S + where + S: Stream> + Send + 'static, + O: Into + 'static, + E: Into> + 'static, + { + stream + } + + #[test] + fn test_byte_stream_stream_can_be_made_compatible_with_hyper_wrap_stream() { + let stream = ByteStream::from_static(b"Hello world"); + check_compatible_with_hyper_wrap_stream(FuturesStreamCompatByteStream::new(stream)); + } +} diff --git a/rust-runtime/aws-smithy-http/src/lib.rs b/rust-runtime/aws-smithy-http/src/lib.rs index b24ecd3bf1..52a259b422 100644 --- a/rust-runtime/aws-smithy-http/src/lib.rs +++ b/rust-runtime/aws-smithy-http/src/lib.rs @@ -27,6 +27,12 @@ pub mod body; pub mod endpoint; +// Marked as `doc(hidden)` because a type in the module is used both by this crate and by the code +// generator, but not by external users. Also, by the module being `doc(hidden)` instead of it being +// in `rust-runtime/inlineable`, each user won't have to pay the cost of running the module's tests +// when compiling their generated SDK. +#[doc(hidden)] +pub mod futures_stream_adapter; pub mod header; pub mod http; pub mod label;