From 9b9757c64bb3d8796a4911b80c77f6b4fd598b31 Mon Sep 17 00:00:00 2001 From: John DiSanti Date: Fri, 13 Oct 2023 17:59:13 -0700 Subject: [PATCH 1/4] Simplify default config and add default async sleep --- .../aws-config/src/imds/client.rs | 7 +- .../ResiliencyConfigCustomization.kt | 88 +----- .../customize/RequiredCustomizations.kt | 2 - .../client/FluentClientGenerator.kt | 32 ++- .../ResiliencyConfigCustomizationTest.kt | 2 +- rust-runtime/aws-smithy-runtime/src/client.rs | 2 + .../aws-smithy-runtime/src/client/defaults.rs | 92 +++++++ .../aws-smithy-runtime/src/client/http.rs | 20 -- .../src/client/orchestrator/operation.rs | 13 +- .../aws-smithy-runtime/src/client/retries.rs | 15 +- .../src/client/retries/client_rate_limiter.rs | 31 --- .../src/client/retries/strategy/standard.rs | 259 ++++++++++-------- .../src/client/retries/token_bucket.rs | 42 +-- rust-runtime/aws-smithy-types/src/retry.rs | 6 + 14 files changed, 287 insertions(+), 324 deletions(-) create mode 100644 rust-runtime/aws-smithy-runtime/src/client/defaults.rs diff --git a/aws/rust-runtime/aws-config/src/imds/client.rs b/aws/rust-runtime/aws-config/src/imds/client.rs index 3b3513f627..7f4a99afd7 100644 --- a/aws/rust-runtime/aws-config/src/imds/client.rs +++ b/aws/rust-runtime/aws-config/src/imds/client.rs @@ -236,13 +236,14 @@ impl ImdsCommonRuntimePlugin { fn new( config: &ProviderConfig, endpoint_resolver: ImdsEndpointResolver, - retry_config: &RetryConfig, + retry_config: RetryConfig, timeout_config: TimeoutConfig, ) -> Self { let mut layer = Layer::new("ImdsCommonRuntimePlugin"); layer.store_put(AuthSchemeOptionResolverParams::new(())); layer.store_put(EndpointResolverParams::new(())); layer.store_put(SensitiveOutput); + layer.store_put(retry_config); layer.store_put(timeout_config); layer.store_put(user_agent()); @@ -253,7 +254,7 @@ impl ImdsCommonRuntimePlugin { .with_endpoint_resolver(Some(endpoint_resolver)) .with_interceptor(UserAgentInterceptor::new()) .with_retry_classifier(SharedRetryClassifier::new(ImdsResponseRetryClassifier)) - .with_retry_strategy(Some(StandardRetryStrategy::new(retry_config))) + .with_retry_strategy(Some(StandardRetryStrategy::new())) .with_time_source(Some(config.time_source())) .with_sleep_impl(config.sleep_impl()), } @@ -421,7 +422,7 @@ impl Builder { let common_plugin = SharedRuntimePlugin::new(ImdsCommonRuntimePlugin::new( &config, endpoint_resolver, - &retry_config, + retry_config, timeout_config, )); let operation = Operation::builder() diff --git a/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/customizations/ResiliencyConfigCustomization.kt b/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/customizations/ResiliencyConfigCustomization.kt index 364185b602..e44fce767f 100644 --- a/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/customizations/ResiliencyConfigCustomization.kt +++ b/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/customizations/ResiliencyConfigCustomization.kt @@ -7,20 +7,16 @@ package software.amazon.smithy.rust.codegen.client.smithy.customizations import software.amazon.smithy.rust.codegen.client.smithy.ClientCodegenContext import software.amazon.smithy.rust.codegen.client.smithy.ClientRustModule -import software.amazon.smithy.rust.codegen.client.smithy.generators.ServiceRuntimePluginCustomization -import software.amazon.smithy.rust.codegen.client.smithy.generators.ServiceRuntimePluginSection import software.amazon.smithy.rust.codegen.client.smithy.generators.config.ConfigCustomization import software.amazon.smithy.rust.codegen.client.smithy.generators.config.ServiceConfig import software.amazon.smithy.rust.codegen.core.rustlang.Attribute -import software.amazon.smithy.rust.codegen.core.rustlang.Writable import software.amazon.smithy.rust.codegen.core.rustlang.rustTemplate import software.amazon.smithy.rust.codegen.core.rustlang.writable import software.amazon.smithy.rust.codegen.core.smithy.RuntimeType import software.amazon.smithy.rust.codegen.core.smithy.RuntimeType.Companion.preludeScope import software.amazon.smithy.rust.codegen.core.smithy.RustCrate -import software.amazon.smithy.rust.codegen.core.util.sdkId -class ResiliencyConfigCustomization(private val codegenContext: ClientCodegenContext) : ConfigCustomization() { +class ResiliencyConfigCustomization(codegenContext: ClientCodegenContext) : ConfigCustomization() { private val runtimeConfig = codegenContext.runtimeConfig private val retryConfig = RuntimeType.smithyTypes(runtimeConfig).resolve("retry") private val sleepModule = RuntimeType.smithyAsync(runtimeConfig).resolve("rt::sleep") @@ -44,8 +40,6 @@ class ResiliencyConfigCustomization(private val codegenContext: ClientCodegenCon "StandardRetryStrategy" to retries.resolve("strategy::StandardRetryStrategy"), "SystemTime" to RuntimeType.std.resolve("time::SystemTime"), "TimeoutConfig" to timeoutModule.resolve("TimeoutConfig"), - "TokenBucket" to retries.resolve("TokenBucket"), - "TokenBucketPartition" to retries.resolve("TokenBucketPartition"), ) override fun section(section: ServiceConfig) = @@ -281,57 +275,6 @@ class ResiliencyConfigCustomization(private val codegenContext: ClientCodegenCon ) } - is ServiceConfig.BuilderBuild -> { - rustTemplate( - """ - if layer.load::<#{RetryConfig}>().is_none() { - layer.store_put(#{RetryConfig}::disabled()); - } - let retry_config = layer.load::<#{RetryConfig}>().expect("set to default above").clone(); - - if layer.load::<#{RetryPartition}>().is_none() { - layer.store_put(#{RetryPartition}::new("${codegenContext.serviceShape.sdkId()}")); - } - let retry_partition = layer.load::<#{RetryPartition}>().expect("set to default above").clone(); - - if retry_config.has_retry() { - #{debug}!("using retry strategy with partition '{}'", retry_partition); - } - - if retry_config.mode() == #{RetryMode}::Adaptive { - if let #{Some}(time_source) = self.runtime_components.time_source() { - let seconds_since_unix_epoch = time_source - .now() - .duration_since(#{SystemTime}::UNIX_EPOCH) - .expect("the present takes place after the UNIX_EPOCH") - .as_secs_f64(); - let client_rate_limiter_partition = #{ClientRateLimiterPartition}::new(retry_partition.clone()); - let client_rate_limiter = CLIENT_RATE_LIMITER.get_or_init(client_rate_limiter_partition, || { - #{ClientRateLimiter}::new(seconds_since_unix_epoch) - }); - layer.store_put(client_rate_limiter); - } - } - - // The token bucket is used for both standard AND adaptive retries. - let token_bucket_partition = #{TokenBucketPartition}::new(retry_partition); - let token_bucket = TOKEN_BUCKET.get_or_init(token_bucket_partition, #{TokenBucket}::default); - layer.store_put(token_bucket); - - // TODO(enableNewSmithyRuntimeCleanup): Should not need to provide a default once smithy-rs##2770 - // is resolved - if layer.load::<#{TimeoutConfig}>().is_none() { - layer.store_put(#{TimeoutConfig}::disabled()); - } - - self.runtime_components.set_retry_strategy(#{Some}( - #{SharedRetryStrategy}::new(#{StandardRetryStrategy}::new(&retry_config))) - ); - """, - *codegenScope, - ) - } - else -> emptySection } } @@ -366,32 +309,3 @@ class ResiliencyReExportCustomization(codegenContext: ClientCodegenContext) { } } } - -class ResiliencyServiceRuntimePluginCustomization(codegenContext: ClientCodegenContext) : ServiceRuntimePluginCustomization() { - private val runtimeConfig = codegenContext.runtimeConfig - private val smithyRuntime = RuntimeType.smithyRuntime(runtimeConfig) - private val retries = smithyRuntime.resolve("client::retries") - private val codegenScope = arrayOf( - "TokenBucket" to retries.resolve("TokenBucket"), - "TokenBucketPartition" to retries.resolve("TokenBucketPartition"), - "ClientRateLimiter" to retries.resolve("ClientRateLimiter"), - "ClientRateLimiterPartition" to retries.resolve("ClientRateLimiterPartition"), - "StaticPartitionMap" to smithyRuntime.resolve("static_partition_map::StaticPartitionMap"), - ) - - override fun section(section: ServiceRuntimePluginSection): Writable = writable { - when (section) { - is ServiceRuntimePluginSection.DeclareSingletons -> { - rustTemplate( - """ - static TOKEN_BUCKET: #{StaticPartitionMap}<#{TokenBucketPartition}, #{TokenBucket}> = #{StaticPartitionMap}::new(); - static CLIENT_RATE_LIMITER: #{StaticPartitionMap}<#{ClientRateLimiterPartition}, #{ClientRateLimiter}> = #{StaticPartitionMap}::new(); - """, - *codegenScope, - ) - } - - else -> emptySection - } - } -} diff --git a/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/customize/RequiredCustomizations.kt b/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/customize/RequiredCustomizations.kt index a1f11cf4a2..c787164070 100644 --- a/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/customize/RequiredCustomizations.kt +++ b/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/customize/RequiredCustomizations.kt @@ -15,7 +15,6 @@ import software.amazon.smithy.rust.codegen.client.smithy.customizations.Intercep import software.amazon.smithy.rust.codegen.client.smithy.customizations.MetadataCustomization import software.amazon.smithy.rust.codegen.client.smithy.customizations.ResiliencyConfigCustomization import software.amazon.smithy.rust.codegen.client.smithy.customizations.ResiliencyReExportCustomization -import software.amazon.smithy.rust.codegen.client.smithy.customizations.ResiliencyServiceRuntimePluginCustomization import software.amazon.smithy.rust.codegen.client.smithy.customizations.RetryClassifierConfigCustomization import software.amazon.smithy.rust.codegen.client.smithy.customizations.RetryClassifierOperationCustomization import software.amazon.smithy.rust.codegen.client.smithy.customizations.RetryClassifierServiceRuntimePluginCustomization @@ -115,7 +114,6 @@ class RequiredCustomizations : ClientCodegenDecorator { codegenContext: ClientCodegenContext, baseCustomizations: List, ): List = baseCustomizations + - ResiliencyServiceRuntimePluginCustomization(codegenContext) + ConnectionPoisoningRuntimePluginCustomization(codegenContext) + RetryClassifierServiceRuntimePluginCustomization(codegenContext) } diff --git a/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/client/FluentClientGenerator.kt b/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/client/FluentClientGenerator.kt index 9166ccd598..06d2c52b49 100644 --- a/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/client/FluentClientGenerator.kt +++ b/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/client/FluentClientGenerator.kt @@ -40,7 +40,6 @@ import software.amazon.smithy.rust.codegen.core.rustlang.rustTemplate import software.amazon.smithy.rust.codegen.core.rustlang.stripOuter import software.amazon.smithy.rust.codegen.core.rustlang.withBlockTemplate import software.amazon.smithy.rust.codegen.core.rustlang.writable -import software.amazon.smithy.rust.codegen.core.smithy.RuntimeConfig import software.amazon.smithy.rust.codegen.core.smithy.RuntimeType import software.amazon.smithy.rust.codegen.core.smithy.RuntimeType.Companion.preludeScope import software.amazon.smithy.rust.codegen.core.smithy.RustCrate @@ -50,9 +49,11 @@ import software.amazon.smithy.rust.codegen.core.smithy.expectRustMetadata import software.amazon.smithy.rust.codegen.core.smithy.generators.getterName import software.amazon.smithy.rust.codegen.core.smithy.generators.setterName import software.amazon.smithy.rust.codegen.core.smithy.rustType +import software.amazon.smithy.rust.codegen.core.util.dq import software.amazon.smithy.rust.codegen.core.util.inputShape import software.amazon.smithy.rust.codegen.core.util.orNull import software.amazon.smithy.rust.codegen.core.util.outputShape +import software.amazon.smithy.rust.codegen.core.util.sdkId import software.amazon.smithy.rust.codegen.core.util.toSnakeCase class FluentClientGenerator( @@ -161,7 +162,7 @@ class FluentClientGenerator( } """, *clientScope, - "base_client_runtime_plugins" to baseClientRuntimePluginsFn(runtimeConfig), + "base_client_runtime_plugins" to baseClientRuntimePluginsFn(codegenContext), ) } @@ -446,8 +447,10 @@ class FluentClientGenerator( } } -private fun baseClientRuntimePluginsFn(runtimeConfig: RuntimeConfig): RuntimeType = +private fun baseClientRuntimePluginsFn(codegenContext: ClientCodegenContext): RuntimeType = codegenContext.runtimeConfig.let { rc -> RuntimeType.forInlineFun("base_client_runtime_plugins", ClientRustModule.config) { + val api = RuntimeType.smithyRuntimeApi(rc) + val rt = RuntimeType.smithyRuntime(rc) rustTemplate( """ pub(crate) fn base_client_runtime_plugins( @@ -456,12 +459,19 @@ private fun baseClientRuntimePluginsFn(runtimeConfig: RuntimeConfig): RuntimeTyp let mut configured_plugins = #{Vec}::new(); ::std::mem::swap(&mut config.runtime_plugins, &mut configured_plugins); let mut plugins = #{RuntimePlugins}::new() + // defaults .with_client_plugin(#{default_http_client_plugin}()) + .with_client_plugin(#{default_retry_config_plugin}(${codegenContext.serviceShape.sdkId().dq()})) + .with_client_plugin(#{default_sleep_impl_plugin}()) + .with_client_plugin(#{default_time_source_plugin}()) + .with_client_plugin(#{default_timeout_config_plugin}()) + // user config .with_client_plugin( #{StaticRuntimePlugin}::new() .with_config(config.config.clone()) .with_runtime_components(config.runtime_components.clone()) ) + // codegen config .with_client_plugin(crate::config::ServiceRuntimePlugin::new(config)) .with_client_plugin(#{NoAuthRuntimePlugin}::new()); for plugin in configured_plugins { @@ -471,15 +481,17 @@ private fun baseClientRuntimePluginsFn(runtimeConfig: RuntimeConfig): RuntimeTyp } """, *preludeScope, - "RuntimePlugins" to RuntimeType.runtimePlugins(runtimeConfig), - "NoAuthRuntimePlugin" to RuntimeType.smithyRuntime(runtimeConfig) - .resolve("client::auth::no_auth::NoAuthRuntimePlugin"), - "StaticRuntimePlugin" to RuntimeType.smithyRuntimeApi(runtimeConfig) - .resolve("client::runtime_plugin::StaticRuntimePlugin"), - "default_http_client_plugin" to RuntimeType.smithyRuntime(runtimeConfig) - .resolve("client::http::default_http_client_plugin"), + "default_http_client_plugin" to rt.resolve("client::defaults::default_http_client_plugin"), + "default_retry_config_plugin" to rt.resolve("client::defaults::default_retry_config_plugin"), + "default_sleep_impl_plugin" to rt.resolve("client::defaults::default_sleep_impl_plugin"), + "default_timeout_config_plugin" to rt.resolve("client::defaults::default_timeout_config_plugin"), + "default_time_source_plugin" to rt.resolve("client::defaults::default_time_source_plugin"), + "NoAuthRuntimePlugin" to rt.resolve("client::auth::no_auth::NoAuthRuntimePlugin"), + "RuntimePlugins" to RuntimeType.runtimePlugins(rc), + "StaticRuntimePlugin" to api.resolve("client::runtime_plugin::StaticRuntimePlugin"), ) } +} /** * For a given `operation` shape, return a list of strings where each string describes the name and input type of one of diff --git a/codegen-client/src/test/kotlin/software/amazon/smithy/rust/codegen/client/smithy/customizations/ResiliencyConfigCustomizationTest.kt b/codegen-client/src/test/kotlin/software/amazon/smithy/rust/codegen/client/smithy/customizations/ResiliencyConfigCustomizationTest.kt index b25e28c75e..4a7a8607e1 100644 --- a/codegen-client/src/test/kotlin/software/amazon/smithy/rust/codegen/client/smithy/customizations/ResiliencyConfigCustomizationTest.kt +++ b/codegen-client/src/test/kotlin/software/amazon/smithy/rust/codegen/client/smithy/customizations/ResiliencyConfigCustomizationTest.kt @@ -45,7 +45,7 @@ internal class ResiliencyConfigCustomizationTest { project.withModule(ClientRustModule.config) { ServiceRuntimePluginGenerator(codegenContext).render( this, - listOf(ResiliencyServiceRuntimePluginCustomization(codegenContext)), + emptyList(), ) } ResiliencyReExportCustomization(codegenContext).extras(project) diff --git a/rust-runtime/aws-smithy-runtime/src/client.rs b/rust-runtime/aws-smithy-runtime/src/client.rs index 392dc12023..4457ef2ac8 100644 --- a/rust-runtime/aws-smithy-runtime/src/client.rs +++ b/rust-runtime/aws-smithy-runtime/src/client.rs @@ -6,6 +6,8 @@ /// Smithy auth scheme implementations. pub mod auth; +pub mod defaults; + pub mod dns; /// Built-in Smithy HTTP clients and connectors. diff --git a/rust-runtime/aws-smithy-runtime/src/client/defaults.rs b/rust-runtime/aws-smithy-runtime/src/client/defaults.rs new file mode 100644 index 0000000000..a61372554f --- /dev/null +++ b/rust-runtime/aws-smithy-runtime/src/client/defaults.rs @@ -0,0 +1,92 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +//! Runtime plugins that provide defaults for clients. +//! +//! Note: these are the absolute base-level defaults. They may not be the defaults +//! for _your_ client, since many things can change these defaults on the way to +//! code generating and constructing a full client. + +use crate::client::retries::strategy::StandardRetryStrategy; +use crate::client::retries::RetryPartition; +use aws_smithy_async::rt::sleep::default_async_sleep; +use aws_smithy_async::time::SystemTimeSource; +use aws_smithy_runtime_api::client::http::SharedHttpClient; +use aws_smithy_runtime_api::client::runtime_components::RuntimeComponentsBuilder; +use aws_smithy_runtime_api::client::runtime_plugin::{ + Order, SharedRuntimePlugin, StaticRuntimePlugin, +}; +use aws_smithy_runtime_api::shared::IntoShared; +use aws_smithy_types::config_bag::Layer; +use aws_smithy_types::retry::RetryConfig; +use aws_smithy_types::timeout::TimeoutConfig; +use std::borrow::Cow; + +/// Runtime plugin that provides a default connector. +pub fn default_http_client_plugin() -> SharedRuntimePlugin { + let _default: Option = None; + #[cfg(feature = "connector-hyper-0-14-x")] + let _default = crate::client::http::hyper_014::default_client(); + + StaticRuntimePlugin::new() + .with_order(Order::Defaults) + .with_runtime_components( + RuntimeComponentsBuilder::new("default_http_client_plugin").with_http_client(_default), + ) + .into_shared() +} + +/// Runtime plugin that provides a default async sleep implementation. +pub fn default_sleep_impl_plugin() -> SharedRuntimePlugin { + StaticRuntimePlugin::new() + .with_order(Order::Defaults) + .with_runtime_components( + RuntimeComponentsBuilder::new("default_sleep_impl_plugin") + .with_sleep_impl(default_async_sleep()), + ) + .into_shared() +} + +/// Runtime plugin that provides a default time source. +pub fn default_time_source_plugin() -> SharedRuntimePlugin { + StaticRuntimePlugin::new() + .with_order(Order::Defaults) + .with_runtime_components( + RuntimeComponentsBuilder::new("default_time_source_plugin") + .with_time_source(Some(SystemTimeSource::new())), + ) + .into_shared() +} + +/// Runtime plugin that sets the default retry strategy, config (disabled), and partition. +pub fn default_retry_config_plugin( + default_partition_name: impl Into>, +) -> SharedRuntimePlugin { + StaticRuntimePlugin::new() + .with_order(Order::Defaults) + .with_runtime_components( + RuntimeComponentsBuilder::new("default_retry_config_plugin") + .with_retry_strategy(Some(StandardRetryStrategy::new())), + ) + .with_config({ + let mut layer = Layer::new("default_retry_config"); + layer.store_put(RetryConfig::disabled()); + layer.store_put(RetryPartition::new(default_partition_name)); + layer.freeze() + }) + .into_shared() +} + +/// Runtime plugin that sets the default timeout config (no timeouts). +pub fn default_timeout_config_plugin() -> SharedRuntimePlugin { + StaticRuntimePlugin::new() + .with_order(Order::Defaults) + .with_config({ + let mut layer = Layer::new("default_timeout_config"); + layer.store_put(TimeoutConfig::disabled()); + layer.freeze() + }) + .into_shared() +} diff --git a/rust-runtime/aws-smithy-runtime/src/client/http.rs b/rust-runtime/aws-smithy-runtime/src/client/http.rs index 6f5960d073..4f02c42870 100644 --- a/rust-runtime/aws-smithy-runtime/src/client/http.rs +++ b/rust-runtime/aws-smithy-runtime/src/client/http.rs @@ -3,12 +3,6 @@ * SPDX-License-Identifier: Apache-2.0 */ -use aws_smithy_runtime_api::client::http::SharedHttpClient; -use aws_smithy_runtime_api::client::runtime_components::RuntimeComponentsBuilder; -use aws_smithy_runtime_api::client::runtime_plugin::{ - Order, SharedRuntimePlugin, StaticRuntimePlugin, -}; - /// Interceptor for connection poisoning. pub mod connection_poisoning; @@ -21,17 +15,3 @@ pub mod test_util; /// needing to provide equivalent functionality for hyper 1.x in the future. #[cfg(feature = "connector-hyper-0-14-x")] pub mod hyper_014; - -/// Runtime plugin that provides a default connector. Intended to be used by the generated code. -pub fn default_http_client_plugin() -> SharedRuntimePlugin { - let _default: Option = None; - #[cfg(feature = "connector-hyper-0-14-x")] - let _default = hyper_014::default_client(); - - let plugin = StaticRuntimePlugin::new() - .with_order(Order::Defaults) - .with_runtime_components( - RuntimeComponentsBuilder::new("default_http_client_plugin").with_http_client(_default), - ); - SharedRuntimePlugin::new(plugin) -} diff --git a/rust-runtime/aws-smithy-runtime/src/client/orchestrator/operation.rs b/rust-runtime/aws-smithy-runtime/src/client/orchestrator/operation.rs index a90674ab43..883e8fb4e9 100644 --- a/rust-runtime/aws-smithy-runtime/src/client/orchestrator/operation.rs +++ b/rust-runtime/aws-smithy-runtime/src/client/orchestrator/operation.rs @@ -4,8 +4,11 @@ */ use crate::client::auth::no_auth::{NoAuthScheme, NO_AUTH_SCHEME_ID}; +use crate::client::defaults::{ + default_http_client_plugin, default_retry_config_plugin, default_sleep_impl_plugin, + default_time_source_plugin, default_timeout_config_plugin, +}; use crate::client::http::connection_poisoning::ConnectionPoisoningInterceptor; -use crate::client::http::default_http_client_plugin; use crate::client::identity::no_auth::NoAuthIdentityResolver; use crate::client::orchestrator::endpoints::StaticUriEndpointResolver; use crate::client::retries::strategy::{NeverRetryStrategy, StandardRetryStrategy}; @@ -222,9 +225,7 @@ impl OperationBuilder { pub fn standard_retry(mut self, retry_config: &RetryConfig) -> Self { self.config.store_put(retry_config.clone()); self.runtime_components - .set_retry_strategy(Some(SharedRetryStrategy::new(StandardRetryStrategy::new( - retry_config, - )))); + .set_retry_strategy(Some(SharedRetryStrategy::new(StandardRetryStrategy::new()))); self } @@ -325,6 +326,10 @@ impl OperationBuilder { let operation_name = self.operation_name.expect("operation_name required"); let mut runtime_plugins = RuntimePlugins::new() .with_client_plugin(default_http_client_plugin()) + .with_client_plugin(default_retry_config_plugin(service_name.clone())) + .with_client_plugin(default_sleep_impl_plugin()) + .with_client_plugin(default_time_source_plugin()) + .with_client_plugin(default_timeout_config_plugin()) .with_client_plugin( StaticRuntimePlugin::new() .with_config(self.config.freeze()) diff --git a/rust-runtime/aws-smithy-runtime/src/client/retries.rs b/rust-runtime/aws-smithy-runtime/src/client/retries.rs index 46bc4332c6..b275b23528 100644 --- a/rust-runtime/aws-smithy-runtime/src/client/retries.rs +++ b/rust-runtime/aws-smithy-runtime/src/client/retries.rs @@ -15,30 +15,29 @@ mod token_bucket; use aws_smithy_types::config_bag::{Storable, StoreReplace}; use std::fmt; -pub use client_rate_limiter::{ClientRateLimiter, ClientRateLimiterRuntimePlugin}; -pub use token_bucket::{TokenBucket, TokenBucketRuntimePlugin}; +pub use client_rate_limiter::ClientRateLimiter; +pub use token_bucket::TokenBucket; #[doc(hidden)] pub use client_rate_limiter::ClientRateLimiterPartition; -#[doc(hidden)] -pub use token_bucket::TokenBucketPartition; +use std::borrow::Cow; #[doc(hidden)] #[non_exhaustive] #[derive(Clone, Debug, Hash, PartialEq, Eq)] pub struct RetryPartition { - inner: &'static str, + name: Cow<'static, str>, } impl RetryPartition { - pub fn new(name: &'static str) -> Self { - Self { inner: name } + pub fn new(name: impl Into>) -> Self { + Self { name: name.into() } } } impl fmt::Display for RetryPartition { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", self.inner) + f.write_str(&self.name) } } diff --git a/rust-runtime/aws-smithy-runtime/src/client/retries/client_rate_limiter.rs b/rust-runtime/aws-smithy-runtime/src/client/retries/client_rate_limiter.rs index 661f0f854c..3e010b0c60 100644 --- a/rust-runtime/aws-smithy-runtime/src/client/retries/client_rate_limiter.rs +++ b/rust-runtime/aws-smithy-runtime/src/client/retries/client_rate_limiter.rs @@ -9,38 +9,11 @@ #![allow(dead_code)] use crate::client::retries::RetryPartition; -use aws_smithy_runtime_api::client::runtime_plugin::RuntimePlugin; use aws_smithy_runtime_api::{builder, builder_methods, builder_struct}; -use aws_smithy_types::config_bag::{FrozenLayer, Layer, Storable, StoreReplace}; use std::sync::{Arc, Mutex}; use std::time::Duration; use tracing::debug; -/// A [`RuntimePlugin`] to provide a client rate limiter, usable by a retry strategy. -#[non_exhaustive] -#[derive(Debug)] -pub struct ClientRateLimiterRuntimePlugin { - rate_limiter: ClientRateLimiter, -} - -impl ClientRateLimiterRuntimePlugin { - /// Create a new [`ClientRateLimiterRuntimePlugin`]. - pub fn new(seconds_since_unix_epoch: f64) -> Self { - Self { - rate_limiter: ClientRateLimiter::new(seconds_since_unix_epoch), - } - } -} - -impl RuntimePlugin for ClientRateLimiterRuntimePlugin { - fn config(&self) -> Option { - let mut cfg = Layer::new("client rate limiter"); - cfg.store_put(self.rate_limiter.clone()); - - Some(cfg.freeze()) - } -} - #[doc(hidden)] #[non_exhaustive] #[derive(Clone, Debug, Hash, PartialEq, Eq)] @@ -104,10 +77,6 @@ pub(crate) enum RequestReason { InitialRequest, } -impl Storable for ClientRateLimiter { - type Storer = StoreReplace; -} - impl ClientRateLimiter { /// Creates a new [`ClientRateLimiter`]. pub fn new(seconds_since_unix_epoch: f64) -> Self { diff --git a/rust-runtime/aws-smithy-runtime/src/client/retries/strategy/standard.rs b/rust-runtime/aws-smithy-runtime/src/client/retries/strategy/standard.rs index 70efea493a..98d1308831 100644 --- a/rust-runtime/aws-smithy-runtime/src/client/retries/strategy/standard.rs +++ b/rust-runtime/aws-smithy-runtime/src/client/retries/strategy/standard.rs @@ -9,29 +9,26 @@ use crate::client::retries::strategy::standard::ReleaseResult::{ APermitWasReleased, NoPermitWasReleased, }; use crate::client::retries::token_bucket::TokenBucket; +use crate::client::retries::{ClientRateLimiterPartition, RetryPartition}; +use crate::static_partition_map::StaticPartitionMap; use aws_smithy_runtime_api::box_error::BoxError; use aws_smithy_runtime_api::client::interceptors::context::InterceptorContext; use aws_smithy_runtime_api::client::retries::classifiers::{RetryAction, RetryReason}; use aws_smithy_runtime_api::client::retries::{RequestAttempts, RetryStrategy, ShouldAttempt}; use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents; use aws_smithy_types::config_bag::{ConfigBag, Storable, StoreReplace}; -use aws_smithy_types::retry::{ErrorKind, RetryConfig}; +use aws_smithy_types::retry::{ErrorKind, RetryConfig, RetryMode}; use std::sync::Mutex; use std::time::{Duration, SystemTime}; use tokio::sync::OwnedSemaphorePermit; use tracing::debug; -// The initial attempt, plus three retries. -const DEFAULT_MAX_ATTEMPTS: u32 = 4; +static CLIENT_RATE_LIMITER: StaticPartitionMap = + StaticPartitionMap::new(); /// Retry strategy with exponential backoff, max attempts, and a token bucket. -#[derive(Debug)] +#[derive(Debug, Default)] pub struct StandardRetryStrategy { - // Retry settings - base: fn() -> f64, - initial_backoff: Duration, - max_attempts: u32, - max_backoff: Duration, retry_permit: Mutex>, } @@ -41,41 +38,8 @@ impl Storable for StandardRetryStrategy { impl StandardRetryStrategy { /// Create a new standard retry strategy with the given config. - pub fn new(retry_config: &RetryConfig) -> Self { - let base = if retry_config.use_static_exponential_base() { - || 1.0 - } else { - fastrand::f64 - }; - Self::default() - .with_base(base) - .with_max_backoff(retry_config.max_backoff()) - .with_max_attempts(retry_config.max_attempts()) - .with_initial_backoff(retry_config.initial_backoff()) - } - - /// Changes the exponential backoff base. - pub fn with_base(mut self, base: fn() -> f64) -> Self { - self.base = base; - self - } - - /// Changes the max number of attempts. - pub fn with_max_attempts(mut self, max_attempts: u32) -> Self { - self.max_attempts = max_attempts; - self - } - - /// Changes the initial backoff time. - pub fn with_initial_backoff(mut self, initial_backoff: Duration) -> Self { - self.initial_backoff = initial_backoff; - self - } - - /// Changes the maximum backoff time. - pub fn with_max_backoff(mut self, max_backoff: Duration) -> Self { - self.max_backoff = max_backoff; - self + pub fn new() -> Self { + Default::default() } fn release_retry_permit(&self) -> ReleaseResult { @@ -98,10 +62,37 @@ impl StandardRetryStrategy { } } + /// Returns a [`ClientRateLimiter`] if adaptive retry is configured. + fn adaptive_retry_rate_limiter( + runtime_components: &RuntimeComponents, + cfg: &ConfigBag, + ) -> Option { + let retry_config = cfg.load::().expect("retry config is required"); + if retry_config.mode() == RetryMode::Adaptive { + if let Some(time_source) = runtime_components.time_source() { + let retry_partition = cfg.load::().expect("set in default config"); + let seconds_since_unix_epoch = time_source + .now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("the present takes place after the UNIX_EPOCH") + .as_secs_f64(); + let client_rate_limiter_partition = + ClientRateLimiterPartition::new(retry_partition.clone()); + let client_rate_limiter = CLIENT_RATE_LIMITER + .get_or_init(client_rate_limiter_partition, || { + ClientRateLimiter::new(seconds_since_unix_epoch) + }); + return Some(client_rate_limiter); + } + } + None + } + fn calculate_backoff( &self, runtime_components: &RuntimeComponents, cfg: &ConfigBag, + retry_cfg: &RetryConfig, retry_reason: &RetryAction, ) -> Result { let request_attempts = cfg @@ -119,13 +110,13 @@ impl StandardRetryStrategy { ); if let Some(delay) = *retry_after { - let delay = delay.min(self.max_backoff); + let delay = delay.min(retry_cfg.max_backoff()); debug!("explicit request from server to delay {delay:?} before retrying"); Ok(delay) } else if let Some(delay) = check_rate_limiter_for_delay(runtime_components, cfg, *kind) { - let delay = delay.min(self.max_backoff); + let delay = delay.min(retry_cfg.max_backoff()); debug!("rate limiter has requested a {delay:?} delay before retrying"); Ok(delay) } else { @@ -139,23 +130,28 @@ impl StandardRetryStrategy { } } + let base = if retry_cfg.use_static_exponential_base() { + 1.0 + } else { + fastrand::f64() + }; let backoff = calculate_exponential_backoff( // Generate a random base multiplier to create jitter - (self.base)(), + base, // Get the backoff time multiplier in seconds (with fractional seconds) - self.initial_backoff.as_secs_f64(), + retry_cfg.initial_backoff().as_secs_f64(), // `self.local.attempts` tracks number of requests made including the initial request // The initial attempt shouldn't count towards backoff calculations so we subtract it request_attempts - 1, ); - Ok(Duration::from_secs_f64(backoff).min(self.max_backoff)) + Ok(Duration::from_secs_f64(backoff).min(retry_cfg.max_backoff())) } } RetryAction::RetryForbidden | RetryAction::NoActionIndicated => { update_rate_limiter_if_exists(runtime_components, cfg, false); debug!( attempts = request_attempts, - max_attempts = self.max_attempts, + max_attempts = retry_cfg.max_attempts(), "encountered unretryable error" ); Err(ShouldAttempt::No) @@ -170,26 +166,13 @@ enum ReleaseResult { NoPermitWasReleased, } -impl Default for StandardRetryStrategy { - fn default() -> Self { - Self { - max_attempts: DEFAULT_MAX_ATTEMPTS, - max_backoff: Duration::from_secs(20), - // by default, use a random base for exponential backoff - base: fastrand::f64, - initial_backoff: Duration::from_secs(1), - retry_permit: Mutex::new(None), - } - } -} - impl RetryStrategy for StandardRetryStrategy { fn should_attempt_initial_request( &self, runtime_components: &RuntimeComponents, cfg: &ConfigBag, ) -> Result { - if let Some(crl) = cfg.load::() { + if let Some(crl) = Self::adaptive_retry_rate_limiter(runtime_components, cfg) { let seconds_since_unix_epoch = get_seconds_since_unix_epoch(runtime_components); if let Err(delay) = crl.acquire_permission_to_send_a_request( seconds_since_unix_epoch, @@ -210,6 +193,7 @@ impl RetryStrategy for StandardRetryStrategy { runtime_components: &RuntimeComponents, cfg: &ConfigBag, ) -> Result { + let retry_cfg = cfg.load::().expect("retry config is required"); // Look a the result. If it's OK then we're done; No retry required. Otherwise, we need to inspect it let output_or_error = ctx.output_or_error().expect( "This must never be called without reaching the point where the result exists.", @@ -237,12 +221,12 @@ impl RetryStrategy for StandardRetryStrategy { .load::() .expect("at least one request attempt is made before any retry is attempted") .attempts(); - if request_attempts >= self.max_attempts { + if request_attempts >= retry_cfg.max_attempts() { update_rate_limiter_if_exists(runtime_components, cfg, false); debug!( attempts = request_attempts, - max_attempts = self.max_attempts, + max_attempts = retry_cfg.max_attempts(), "not retrying because we are out of attempts" ); return Ok(ShouldAttempt::No); @@ -253,11 +237,12 @@ impl RetryStrategy for StandardRetryStrategy { let classifier_result = run_classifiers_on_ctx(retry_classifiers, ctx); // Calculate the appropriate backoff time. - let backoff = match self.calculate_backoff(runtime_components, cfg, &classifier_result) { - Ok(value) => value, - // In some cases, backoff calculation will decide that we shouldn't retry at all. - Err(value) => return Ok(value), - }; + let backoff = + match self.calculate_backoff(runtime_components, cfg, retry_cfg, &classifier_result) { + Ok(value) => value, + // In some cases, backoff calculation will decide that we shouldn't retry at all. + Err(value) => return Ok(value), + }; debug!( "attempt #{request_attempts} failed with {:?}; retrying after {:?}", classifier_result, backoff, @@ -272,7 +257,7 @@ fn update_rate_limiter_if_exists( cfg: &ConfigBag, is_throttling_error: bool, ) { - if let Some(crl) = cfg.load::() { + if let Some(crl) = StandardRetryStrategy::adaptive_retry_rate_limiter(runtime_components, cfg) { let seconds_since_unix_epoch = get_seconds_since_unix_epoch(runtime_components); crl.update_rate_limiter(seconds_since_unix_epoch, is_throttling_error); } @@ -283,7 +268,7 @@ fn check_rate_limiter_for_delay( cfg: &ConfigBag, kind: ErrorKind, ) -> Option { - if let Some(crl) = cfg.load::() { + if let Some(crl) = StandardRetryStrategy::adaptive_retry_rate_limiter(runtime_components, cfg) { let retry_reason = if kind == ErrorKind::ThrottlingError { RequestReason::RetryTimeout } else { @@ -336,7 +321,11 @@ mod tests { #[test] fn no_retry_necessary_for_ok_result() { - let cfg = ConfigBag::base(); + let cfg = ConfigBag::of_layers(vec![{ + let mut layer = Layer::new("test"); + layer.store_put(RetryConfig::standard()); + layer + }]); let rc = RuntimeComponentsBuilder::for_tests().build().unwrap(); let mut ctx = InterceptorContext::new(Input::doesnt_matter()); let strategy = StandardRetryStrategy::default(); @@ -350,6 +339,7 @@ mod tests { fn set_up_cfg_and_context( error_kind: ErrorKind, current_request_attempts: u32, + retry_config: RetryConfig, ) -> (InterceptorContext, RuntimeComponents, ConfigBag) { let mut ctx = InterceptorContext::new(Input::doesnt_matter()); ctx.set_output_or_error(Err(OrchestratorError::other("doesn't matter"))); @@ -359,6 +349,7 @@ mod tests { .unwrap(); let mut layer = Layer::new("test"); layer.store_put(RequestAttempts::new(current_request_attempts)); + layer.store_put(retry_config); let cfg = ConfigBag::of_layers(vec![layer]); (ctx, rc, cfg) @@ -367,8 +358,14 @@ mod tests { // Test that error kinds produce the correct "retry after X seconds" output. // All error kinds are handled in the same way for the standard strategy. fn test_should_retry_error_kind(error_kind: ErrorKind) { - let (ctx, rc, cfg) = set_up_cfg_and_context(error_kind, 3); - let strategy = StandardRetryStrategy::default().with_base(|| 1.0); + let (ctx, rc, cfg) = set_up_cfg_and_context( + error_kind, + 3, + RetryConfig::standard() + .with_use_static_exponential_base(true) + .with_max_attempts(4), + ); + let strategy = StandardRetryStrategy::new(); let actual = strategy .should_attempt_retry(&ctx, &rc, &cfg) .expect("method is infallible for this use"); @@ -399,10 +396,14 @@ mod tests { fn dont_retry_when_out_of_attempts() { let current_attempts = 4; let max_attempts = current_attempts; - let (ctx, rc, cfg) = set_up_cfg_and_context(ErrorKind::TransientError, current_attempts); - let strategy = StandardRetryStrategy::default() - .with_base(|| 1.0) - .with_max_attempts(max_attempts); + let (ctx, rc, cfg) = set_up_cfg_and_context( + ErrorKind::TransientError, + current_attempts, + RetryConfig::standard() + .with_use_static_exponential_base(true) + .with_max_attempts(max_attempts), + ); + let strategy = StandardRetryStrategy::new(); let actual = strategy .should_attempt_retry(&ctx, &rc, &cfg) .expect("method is infallible for this use"); @@ -471,6 +472,7 @@ mod tests { #[cfg(feature = "test-util")] fn setup_test( retry_reasons: Vec, + retry_config: RetryConfig, ) -> (ConfigBag, RuntimeComponents, InterceptorContext) { let rc = RuntimeComponentsBuilder::for_tests() .with_retry_classifier(SharedRetryClassifier::new( @@ -478,7 +480,9 @@ mod tests { )) .build() .unwrap(); - let cfg = ConfigBag::base(); + let mut layer = Layer::new("test"); + layer.store_put(retry_config); + let cfg = ConfigBag::of_layers(vec![layer]); let mut ctx = InterceptorContext::new(Input::doesnt_matter()); // This type doesn't matter b/c the classifier will just return whatever we tell it to. ctx.set_output_or_error(Err(OrchestratorError::other("doesn't matter"))); @@ -489,10 +493,13 @@ mod tests { #[cfg(feature = "test-util")] #[test] fn eventual_success() { - let (mut cfg, rc, mut ctx) = setup_test(vec![RetryAction::server_error()]); - let strategy = StandardRetryStrategy::default() - .with_base(|| 1.0) - .with_max_attempts(5); + let (mut cfg, rc, mut ctx) = setup_test( + vec![RetryAction::server_error()], + RetryConfig::standard() + .with_use_static_exponential_base(true) + .with_max_attempts(5), + ); + let strategy = StandardRetryStrategy::new(); cfg.interceptor_state().store_put(TokenBucket::default()); let token_bucket = cfg.load::().unwrap().clone(); @@ -519,10 +526,13 @@ mod tests { #[cfg(feature = "test-util")] #[test] fn no_more_attempts() { - let (mut cfg, rc, ctx) = setup_test(vec![RetryAction::server_error()]); - let strategy = StandardRetryStrategy::default() - .with_base(|| 1.0) - .with_max_attempts(3); + let (mut cfg, rc, ctx) = setup_test( + vec![RetryAction::server_error()], + RetryConfig::standard() + .with_use_static_exponential_base(true) + .with_max_attempts(3), + ); + let strategy = StandardRetryStrategy::new(); cfg.interceptor_state().store_put(TokenBucket::default()); let token_bucket = cfg.load::().unwrap().clone(); @@ -547,10 +557,13 @@ mod tests { #[cfg(feature = "test-util")] #[test] fn no_quota() { - let (mut cfg, rc, ctx) = setup_test(vec![RetryAction::server_error()]); - let strategy = StandardRetryStrategy::default() - .with_base(|| 1.0) - .with_max_attempts(5); + let (mut cfg, rc, ctx) = setup_test( + vec![RetryAction::server_error()], + RetryConfig::standard() + .with_use_static_exponential_base(true) + .with_max_attempts(5), + ); + let strategy = StandardRetryStrategy::new(); cfg.interceptor_state().store_put(TokenBucket::new(5)); let token_bucket = cfg.load::().unwrap().clone(); @@ -569,16 +582,19 @@ mod tests { #[cfg(feature = "test-util")] #[test] fn quota_replenishes_on_success() { - let (mut cfg, rc, mut ctx) = setup_test(vec![ - RetryAction::transient_error(), - RetryAction::retryable_error_with_explicit_delay( - ErrorKind::TransientError, - Duration::from_secs(1), - ), - ]); - let strategy = StandardRetryStrategy::default() - .with_base(|| 1.0) - .with_max_attempts(5); + let (mut cfg, rc, mut ctx) = setup_test( + vec![ + RetryAction::transient_error(), + RetryAction::retryable_error_with_explicit_delay( + ErrorKind::TransientError, + Duration::from_secs(1), + ), + ], + RetryConfig::standard() + .with_use_static_exponential_base(true) + .with_max_attempts(5), + ); + let strategy = StandardRetryStrategy::new(); cfg.interceptor_state().store_put(TokenBucket::new(100)); let token_bucket = cfg.load::().unwrap().clone(); @@ -607,10 +623,13 @@ mod tests { #[test] fn quota_replenishes_on_first_try_success() { const PERMIT_COUNT: usize = 20; - let (mut cfg, rc, mut ctx) = setup_test(vec![RetryAction::transient_error()]); - let strategy = StandardRetryStrategy::default() - .with_base(|| 1.0) - .with_max_attempts(u32::MAX); + let (mut cfg, rc, mut ctx) = setup_test( + vec![RetryAction::transient_error()], + RetryConfig::standard() + .with_use_static_exponential_base(true) + .with_max_attempts(u32::MAX), + ); + let strategy = StandardRetryStrategy::new(); cfg.interceptor_state() .store_put(TokenBucket::new(PERMIT_COUNT)); let token_bucket = cfg.load::().unwrap().clone(); @@ -657,10 +676,13 @@ mod tests { #[cfg(feature = "test-util")] #[test] fn backoff_timing() { - let (mut cfg, rc, ctx) = setup_test(vec![RetryAction::server_error()]); - let strategy = StandardRetryStrategy::default() - .with_base(|| 1.0) - .with_max_attempts(5); + let (mut cfg, rc, ctx) = setup_test( + vec![RetryAction::server_error()], + RetryConfig::standard() + .with_use_static_exponential_base(true) + .with_max_attempts(5), + ); + let strategy = StandardRetryStrategy::new(); cfg.interceptor_state().store_put(TokenBucket::default()); let token_bucket = cfg.load::().unwrap().clone(); @@ -697,12 +719,15 @@ mod tests { #[cfg(feature = "test-util")] #[test] fn max_backoff_time() { - let (mut cfg, rc, ctx) = setup_test(vec![RetryAction::server_error()]); - let strategy = StandardRetryStrategy::default() - .with_base(|| 1.0) - .with_max_attempts(5) - .with_initial_backoff(Duration::from_secs(1)) - .with_max_backoff(Duration::from_secs(3)); + let (mut cfg, rc, ctx) = setup_test( + vec![RetryAction::server_error()], + RetryConfig::standard() + .with_use_static_exponential_base(true) + .with_max_attempts(5) + .with_initial_backoff(Duration::from_secs(1)) + .with_max_backoff(Duration::from_secs(3)), + ); + let strategy = StandardRetryStrategy::new(); cfg.interceptor_state().store_put(TokenBucket::default()); let token_bucket = cfg.load::().unwrap().clone(); diff --git a/rust-runtime/aws-smithy-runtime/src/client/retries/token_bucket.rs b/rust-runtime/aws-smithy-runtime/src/client/retries/token_bucket.rs index 686185b1d1..c2c8ba8c64 100644 --- a/rust-runtime/aws-smithy-runtime/src/client/retries/token_bucket.rs +++ b/rust-runtime/aws-smithy-runtime/src/client/retries/token_bucket.rs @@ -3,52 +3,12 @@ * SPDX-License-Identifier: Apache-2.0 */ -use crate::client::retries::RetryPartition; -use aws_smithy_runtime_api::client::runtime_plugin::RuntimePlugin; -use aws_smithy_types::config_bag::{FrozenLayer, Layer, Storable, StoreReplace}; +use aws_smithy_types::config_bag::{Storable, StoreReplace}; use aws_smithy_types::retry::ErrorKind; use std::sync::Arc; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tracing::trace; -/// A [`RuntimePlugin`] to provide a token bucket, usable by a retry strategy. -#[non_exhaustive] -#[derive(Debug, Default)] -pub struct TokenBucketRuntimePlugin { - token_bucket: TokenBucket, -} - -impl TokenBucketRuntimePlugin { - /// Creates a new `TokenBucketRuntimePlugin` with the given initial quota. - pub fn new(initial_tokens: usize) -> Self { - Self { - token_bucket: TokenBucket::new(initial_tokens), - } - } -} - -impl RuntimePlugin for TokenBucketRuntimePlugin { - fn config(&self) -> Option { - let mut cfg = Layer::new("standard token bucket"); - cfg.store_put(self.token_bucket.clone()); - - Some(cfg.freeze()) - } -} - -#[doc(hidden)] -#[non_exhaustive] -#[derive(Clone, Debug, Hash, PartialEq, Eq)] -pub struct TokenBucketPartition { - retry_partition: RetryPartition, -} - -impl TokenBucketPartition { - pub fn new(retry_partition: RetryPartition) -> Self { - Self { retry_partition } - } -} - const DEFAULT_CAPACITY: usize = 500; const RETRY_COST: u32 = 5; const RETRY_TIMEOUT_COST: u32 = RETRY_COST * 2; diff --git a/rust-runtime/aws-smithy-types/src/retry.rs b/rust-runtime/aws-smithy-types/src/retry.rs index 911a37ca3a..964dbbdc6e 100644 --- a/rust-runtime/aws-smithy-types/src/retry.rs +++ b/rust-runtime/aws-smithy-types/src/retry.rs @@ -393,6 +393,12 @@ impl RetryConfig { self } + /// Set the maximum backoff time. + pub fn with_max_backoff(mut self, max_backoff: Duration) -> Self { + self.max_backoff = max_backoff; + self + } + /// Hint to the retry strategy whether to use a static exponential base. /// /// When a retry strategy uses exponential backoff, it calculates a random base. This causes the From ea3ae9620768fb652dbffc9c7eeef07cccfc3833 Mon Sep 17 00:00:00 2001 From: John DiSanti Date: Mon, 16 Oct 2023 13:52:54 -0700 Subject: [PATCH 2/4] Incorporate feedback --- .../client/FluentClientGenerator.kt | 15 ++- .../src/client/runtime_plugin.rs | 25 +++++ .../aws-smithy-runtime/src/client/defaults.rs | 98 +++++++++++-------- .../src/client/orchestrator/operation.rs | 17 +++- 4 files changed, 102 insertions(+), 53 deletions(-) diff --git a/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/client/FluentClientGenerator.kt b/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/client/FluentClientGenerator.kt index 06d2c52b49..9dc28784ef 100644 --- a/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/client/FluentClientGenerator.kt +++ b/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/client/FluentClientGenerator.kt @@ -458,13 +458,18 @@ private fun baseClientRuntimePluginsFn(codegenContext: ClientCodegenContext): Ru ) -> #{RuntimePlugins} { let mut configured_plugins = #{Vec}::new(); ::std::mem::swap(&mut config.runtime_plugins, &mut configured_plugins); + + let defaults = [ + #{default_http_client_plugin}(), + #{default_retry_config_plugin}(${codegenContext.serviceShape.sdkId().dq()}), + #{default_sleep_impl_plugin}(), + #{default_time_source_plugin}(), + #{default_timeout_config_plugin}(), + ].into_iter().filter_map(|d| d); + let mut plugins = #{RuntimePlugins}::new() // defaults - .with_client_plugin(#{default_http_client_plugin}()) - .with_client_plugin(#{default_retry_config_plugin}(${codegenContext.serviceShape.sdkId().dq()})) - .with_client_plugin(#{default_sleep_impl_plugin}()) - .with_client_plugin(#{default_time_source_plugin}()) - .with_client_plugin(#{default_timeout_config_plugin}()) + .with_client_plugins(defaults) // user config .with_client_plugin( #{StaticRuntimePlugin}::new() diff --git a/rust-runtime/aws-smithy-runtime-api/src/client/runtime_plugin.rs b/rust-runtime/aws-smithy-runtime-api/src/client/runtime_plugin.rs index 4260b6af81..85b8f29ef7 100644 --- a/rust-runtime/aws-smithy-runtime-api/src/client/runtime_plugin.rs +++ b/rust-runtime/aws-smithy-runtime-api/src/client/runtime_plugin.rs @@ -236,10 +236,22 @@ pub struct RuntimePlugins { } impl RuntimePlugins { + /// Create a new empty set of runtime plugins. pub fn new() -> Self { Default::default() } + /// Add several client-level runtime plugins from an iterator. + pub fn with_client_plugins( + mut self, + plugins: impl IntoIterator, + ) -> Self { + for plugin in plugins.into_iter() { + self = self.with_client_plugin(plugin); + } + self + } + /// Adds a client-level runtime plugin. pub fn with_client_plugin(mut self, plugin: impl RuntimePlugin + 'static) -> Self { insert_plugin!( @@ -249,6 +261,17 @@ impl RuntimePlugins { self } + /// Add several operation-level runtime plugins from an iterator. + pub fn with_operation_plugins( + mut self, + plugins: impl IntoIterator, + ) -> Self { + for plugin in plugins.into_iter() { + self = self.with_operation_plugin(plugin); + } + self + } + /// Adds an operation-level runtime plugin. pub fn with_operation_plugin(mut self, plugin: impl RuntimePlugin + 'static) -> Self { insert_plugin!( @@ -258,6 +281,7 @@ impl RuntimePlugins { self } + /// Apply the client-level runtime plugins' config to the given config bag. pub fn apply_client_configuration( &self, cfg: &mut ConfigBag, @@ -265,6 +289,7 @@ impl RuntimePlugins { apply_plugins!(client, self.client_plugins, cfg) } + /// Apply the operation-level runtime plugins' config to the given config bag. pub fn apply_operation_configuration( &self, cfg: &mut ConfigBag, diff --git a/rust-runtime/aws-smithy-runtime/src/client/defaults.rs b/rust-runtime/aws-smithy-runtime/src/client/defaults.rs index a61372554f..2cc91fb461 100644 --- a/rust-runtime/aws-smithy-runtime/src/client/defaults.rs +++ b/rust-runtime/aws-smithy-runtime/src/client/defaults.rs @@ -19,74 +19,86 @@ use aws_smithy_runtime_api::client::runtime_plugin::{ Order, SharedRuntimePlugin, StaticRuntimePlugin, }; use aws_smithy_runtime_api::shared::IntoShared; -use aws_smithy_types::config_bag::Layer; +use aws_smithy_types::config_bag::{FrozenLayer, Layer}; use aws_smithy_types::retry::RetryConfig; use aws_smithy_types::timeout::TimeoutConfig; use std::borrow::Cow; +fn default_plugin(name: &'static str, components_fn: CompFn) -> StaticRuntimePlugin +where + CompFn: FnOnce(RuntimeComponentsBuilder) -> RuntimeComponentsBuilder, +{ + StaticRuntimePlugin::new() + .with_order(Order::Defaults) + .with_runtime_components((components_fn)(RuntimeComponentsBuilder::new(name))) +} + +fn layer(name: &'static str, layer_fn: LayerFn) -> FrozenLayer +where + LayerFn: FnOnce(&mut Layer) -> (), +{ + let mut layer = Layer::new(name); + (layer_fn)(&mut layer); + layer.freeze() +} + /// Runtime plugin that provides a default connector. -pub fn default_http_client_plugin() -> SharedRuntimePlugin { +pub fn default_http_client_plugin() -> Option { let _default: Option = None; #[cfg(feature = "connector-hyper-0-14-x")] let _default = crate::client::http::hyper_014::default_client(); - StaticRuntimePlugin::new() - .with_order(Order::Defaults) - .with_runtime_components( - RuntimeComponentsBuilder::new("default_http_client_plugin").with_http_client(_default), - ) + _default.map(|default| { + default_plugin("default_http_client_plugin", |components| { + components.with_http_client(Some(default)) + }) .into_shared() + }) } /// Runtime plugin that provides a default async sleep implementation. -pub fn default_sleep_impl_plugin() -> SharedRuntimePlugin { - StaticRuntimePlugin::new() - .with_order(Order::Defaults) - .with_runtime_components( - RuntimeComponentsBuilder::new("default_sleep_impl_plugin") - .with_sleep_impl(default_async_sleep()), - ) +pub fn default_sleep_impl_plugin() -> Option { + default_async_sleep().map(|default| { + default_plugin("default_sleep_impl_plugin", |components| { + components.with_sleep_impl(Some(default)) + }) .into_shared() + }) } /// Runtime plugin that provides a default time source. -pub fn default_time_source_plugin() -> SharedRuntimePlugin { - StaticRuntimePlugin::new() - .with_order(Order::Defaults) - .with_runtime_components( - RuntimeComponentsBuilder::new("default_time_source_plugin") - .with_time_source(Some(SystemTimeSource::new())), - ) - .into_shared() +pub fn default_time_source_plugin() -> Option { + Some( + default_plugin("default_time_source_plugin", |components| { + components.with_time_source(Some(SystemTimeSource::new())) + }) + .into_shared(), + ) } /// Runtime plugin that sets the default retry strategy, config (disabled), and partition. pub fn default_retry_config_plugin( default_partition_name: impl Into>, -) -> SharedRuntimePlugin { - StaticRuntimePlugin::new() - .with_order(Order::Defaults) - .with_runtime_components( - RuntimeComponentsBuilder::new("default_retry_config_plugin") - .with_retry_strategy(Some(StandardRetryStrategy::new())), - ) - .with_config({ - let mut layer = Layer::new("default_retry_config"); +) -> Option { + Some( + default_plugin("default_retry_config_plugin", |components| { + components.with_retry_strategy(Some(StandardRetryStrategy::new())) + }) + .with_config(layer("default_retry_config", |layer| { layer.store_put(RetryConfig::disabled()); layer.store_put(RetryPartition::new(default_partition_name)); - layer.freeze() - }) - .into_shared() + })) + .into_shared(), + ) } /// Runtime plugin that sets the default timeout config (no timeouts). -pub fn default_timeout_config_plugin() -> SharedRuntimePlugin { - StaticRuntimePlugin::new() - .with_order(Order::Defaults) - .with_config({ - let mut layer = Layer::new("default_timeout_config"); - layer.store_put(TimeoutConfig::disabled()); - layer.freeze() - }) - .into_shared() +pub fn default_timeout_config_plugin() -> Option { + Some( + default_plugin("default_timeout_config_plugin", |c| c) + .with_config(layer("default_timeout_config", |layer| { + layer.store_put(TimeoutConfig::disabled()); + })) + .into_shared(), + ) } diff --git a/rust-runtime/aws-smithy-runtime/src/client/orchestrator/operation.rs b/rust-runtime/aws-smithy-runtime/src/client/orchestrator/operation.rs index 883e8fb4e9..0d89a8f6bf 100644 --- a/rust-runtime/aws-smithy-runtime/src/client/orchestrator/operation.rs +++ b/rust-runtime/aws-smithy-runtime/src/client/orchestrator/operation.rs @@ -324,12 +324,19 @@ impl OperationBuilder { pub fn build(self) -> Operation { let service_name = self.service_name.expect("service_name required"); let operation_name = self.operation_name.expect("operation_name required"); + + let defaults = [ + default_http_client_plugin(), + default_retry_config_plugin(service_name.clone()), + default_sleep_impl_plugin(), + default_time_source_plugin(), + default_timeout_config_plugin(), + ] + .into_iter() + .flat_map(|d| d); + let mut runtime_plugins = RuntimePlugins::new() - .with_client_plugin(default_http_client_plugin()) - .with_client_plugin(default_retry_config_plugin(service_name.clone())) - .with_client_plugin(default_sleep_impl_plugin()) - .with_client_plugin(default_time_source_plugin()) - .with_client_plugin(default_timeout_config_plugin()) + .with_client_plugins(defaults) .with_client_plugin( StaticRuntimePlugin::new() .with_config(self.config.freeze()) From 1bebbe75cda38bb536f5c2d73c815a87332b66db Mon Sep 17 00:00:00 2001 From: John DiSanti Date: Mon, 16 Oct 2023 14:01:12 -0700 Subject: [PATCH 3/4] Fix clippy lints --- .../client/smithy/generators/client/FluentClientGenerator.kt | 2 +- rust-runtime/aws-smithy-runtime/src/client/defaults.rs | 2 +- .../aws-smithy-runtime/src/client/orchestrator/operation.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/client/FluentClientGenerator.kt b/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/client/FluentClientGenerator.kt index 9dc28784ef..dfd11ed295 100644 --- a/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/client/FluentClientGenerator.kt +++ b/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/client/FluentClientGenerator.kt @@ -465,7 +465,7 @@ private fun baseClientRuntimePluginsFn(codegenContext: ClientCodegenContext): Ru #{default_sleep_impl_plugin}(), #{default_time_source_plugin}(), #{default_timeout_config_plugin}(), - ].into_iter().filter_map(|d| d); + ].into_iter().flatten(); let mut plugins = #{RuntimePlugins}::new() // defaults diff --git a/rust-runtime/aws-smithy-runtime/src/client/defaults.rs b/rust-runtime/aws-smithy-runtime/src/client/defaults.rs index 2cc91fb461..6bba117144 100644 --- a/rust-runtime/aws-smithy-runtime/src/client/defaults.rs +++ b/rust-runtime/aws-smithy-runtime/src/client/defaults.rs @@ -35,7 +35,7 @@ where fn layer(name: &'static str, layer_fn: LayerFn) -> FrozenLayer where - LayerFn: FnOnce(&mut Layer) -> (), + LayerFn: FnOnce(&mut Layer), { let mut layer = Layer::new(name); (layer_fn)(&mut layer); diff --git a/rust-runtime/aws-smithy-runtime/src/client/orchestrator/operation.rs b/rust-runtime/aws-smithy-runtime/src/client/orchestrator/operation.rs index cbcd8746a7..4777f313a6 100644 --- a/rust-runtime/aws-smithy-runtime/src/client/orchestrator/operation.rs +++ b/rust-runtime/aws-smithy-runtime/src/client/orchestrator/operation.rs @@ -333,7 +333,7 @@ impl OperationBuilder { default_timeout_config_plugin(), ] .into_iter() - .flat_map(|d| d); + .flatten(); let mut runtime_plugins = RuntimePlugins::new() .with_client_plugins(defaults) From 652f2ec79ad50c3583d20239a9bebf48bf1eb9f8 Mon Sep 17 00:00:00 2001 From: John DiSanti Date: Mon, 16 Oct 2023 14:51:59 -0700 Subject: [PATCH 4/4] Fix tests --- .../customizations/SensitiveOutputDecoratorTest.kt | 8 ++++++-- .../ConfigOverrideRuntimePluginGeneratorTest.kt | 10 ++++++++-- .../smithy/rust/codegen/core/rustlang/RustType.kt | 1 - 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/codegen-client/src/test/kotlin/software/amazon/smithy/rust/codegen/client/smithy/customizations/SensitiveOutputDecoratorTest.kt b/codegen-client/src/test/kotlin/software/amazon/smithy/rust/codegen/client/smithy/customizations/SensitiveOutputDecoratorTest.kt index bf634170ca..66047aca47 100644 --- a/codegen-client/src/test/kotlin/software/amazon/smithy/rust/codegen/client/smithy/customizations/SensitiveOutputDecoratorTest.kt +++ b/codegen-client/src/test/kotlin/software/amazon/smithy/rust/codegen/client/smithy/customizations/SensitiveOutputDecoratorTest.kt @@ -8,6 +8,7 @@ package software.amazon.smithy.rust.codegen.client.smithy.customizations import org.junit.jupiter.api.Test import software.amazon.smithy.rust.codegen.client.testutil.clientIntegrationTest import software.amazon.smithy.rust.codegen.core.rustlang.Attribute +import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency import software.amazon.smithy.rust.codegen.core.rustlang.rustTemplate import software.amazon.smithy.rust.codegen.core.smithy.RuntimeConfig import software.amazon.smithy.rust.codegen.core.smithy.RuntimeType @@ -16,6 +17,8 @@ import software.amazon.smithy.rust.codegen.core.testutil.integrationTest class SensitiveOutputDecoratorTest { private fun codegenScope(runtimeConfig: RuntimeConfig): Array> = arrayOf( + "capture_test_logs" to CargoDependency.smithyRuntimeTestUtil(runtimeConfig).toType() + .resolve("test_util::capture_test_logs::capture_test_logs"), "capture_request" to RuntimeType.captureRequest(runtimeConfig), "SdkBody" to RuntimeType.sdkBody(runtimeConfig), ) @@ -48,10 +51,10 @@ class SensitiveOutputDecoratorTest { rustCrate.integrationTest("redacting_sensitive_response_body") { val moduleName = codegenContext.moduleUseName() Attribute.TokioTest.render(this) - Attribute.TracedTest.render(this) rustTemplate( """ async fn redacting_sensitive_response_body() { + let (_logs, logs_rx) = #{capture_test_logs}(); let (http_client, _r) = #{capture_request}(Some( http::Response::builder() .status(200) @@ -69,7 +72,8 @@ class SensitiveOutputDecoratorTest { .await .expect("success"); - assert!(logs_contain("** REDACTED **")); + let log_contents = logs_rx.contents(); + assert!(log_contents.contains("** REDACTED **")); } """, *codegenScope(codegenContext.runtimeConfig), diff --git a/codegen-client/src/test/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/ConfigOverrideRuntimePluginGeneratorTest.kt b/codegen-client/src/test/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/ConfigOverrideRuntimePluginGeneratorTest.kt index f167d35767..5e994ee780 100644 --- a/codegen-client/src/test/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/ConfigOverrideRuntimePluginGeneratorTest.kt +++ b/codegen-client/src/test/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/ConfigOverrideRuntimePluginGeneratorTest.kt @@ -154,7 +154,7 @@ internal class ConfigOverrideRuntimePluginGeneratorTest { } @Test - fun `operation overrides retry strategy`() { + fun `operation overrides retry config`() { clientIntegrationTest(model) { clientCodegenContext, rustCrate -> val runtimeConfig = clientCodegenContext.runtimeConfig val codegenScope = arrayOf( @@ -176,11 +176,13 @@ internal class ConfigOverrideRuntimePluginGeneratorTest { .resolve("client::retries::RetryClassifiers"), "RuntimeComponentsBuilder" to RuntimeType.runtimeComponentsBuilder(runtimeConfig), "RuntimePlugin" to RuntimeType.runtimePlugin(runtimeConfig), + "StandardRetryStrategy" to RuntimeType.smithyRuntime(runtimeConfig) + .resolve("client::retries::strategy::StandardRetryStrategy"), "ShouldAttempt" to RuntimeType.smithyRuntimeApi(runtimeConfig) .resolve("client::retries::ShouldAttempt"), ) rustCrate.testModule { - unitTest("test_operation_overrides_retry_strategy") { + unitTest("test_operation_overrides_retry_config") { rustTemplate( """ use #{RuntimePlugin}; @@ -205,6 +207,8 @@ internal class ConfigOverrideRuntimePluginGeneratorTest { // Emulate the merging of runtime components from runtime plugins that the orchestrator does let runtime_components = #{RuntimeComponentsBuilder}::for_tests() + // emulate the default retry config plugin by setting a retry strategy + .with_retry_strategy(#{Some}(#{StandardRetryStrategy}::new())) .merge_from(&client_config.runtime_components) .merge_from(&retry_classifiers_component) .build() @@ -231,6 +235,8 @@ internal class ConfigOverrideRuntimePluginGeneratorTest { // Emulate the merging of runtime components from runtime plugins that the orchestrator does let runtime_components = #{RuntimeComponentsBuilder}::for_tests() + // emulate the default retry config plugin by setting a retry strategy + .with_retry_strategy(#{Some}(#{StandardRetryStrategy}::new())) .merge_from(&client_config.runtime_components) .merge_from(&retry_classifiers_component) .merge_from(&config_override.runtime_components) diff --git a/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/rustlang/RustType.kt b/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/rustlang/RustType.kt index a3d5d568d8..5a46d9fa11 100644 --- a/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/rustlang/RustType.kt +++ b/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/rustlang/RustType.kt @@ -524,7 +524,6 @@ class Attribute(val inner: Writable, val isDeriveHelper: Boolean = false) { val Test = Attribute("test") val TokioTest = Attribute(RuntimeType.Tokio.resolve("test").writable) - val TracedTest = Attribute(RuntimeType.TracingTest.resolve("traced_test").writable) val AwsSdkUnstableAttribute = Attribute(cfg("aws_sdk_unstable")) /**