Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rfc43: fix identity cache partition #3566

Merged
merged 1 commit into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions CHANGELOG.next.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,25 @@
# references = ["smithy-rs#920"]
# meta = { "breaking" = false, "tada" = false, "bug" = false, "target" = "client | server | all"}
# author = "rcoh"
[[aws-sdk-rust]]
message = """
Fixes the identity resolver types (`credentials_provider()` and `token_provider()`) from `SdkConfig` to have
a consistent identity cache partition when re-used across different clients.
"""
references = ["smithy-rs#3427"]
meta = { "breaking" = false, "tada" = false, "bug" = true }
authors = ["aajtodd"]


[[smithy-rs]]
message = """
`SharedIdentityResolver` now respects an existing cache partition when the `ResolveIdentity` implementation
provides one already.
"""
references = ["smithy-rs#3427"]
meta = { "breaking" = false, "tada" = false, "bug" = true }
authors = ["aajtodd"]

[[smithy-rs]]
message = """
Stalled stream protection now supports request upload streams. It is currently off by default, but will be enabled by default in a future release. To enable it now, you can do the following:
Expand Down
2 changes: 1 addition & 1 deletion aws/rust-runtime/aws-credential-types/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "aws-credential-types"
version = "1.1.8"
version = "1.2.0"
authors = ["AWS Rust SDK Team <[email protected]>"]
description = "Types for AWS SDK credentials."
edition = "2021"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ construct credentials from hardcoded values.
//! ```

use crate::Credentials;
use aws_smithy_runtime_api::client::identity::{Identity, IdentityFuture, ResolveIdentity};
use aws_smithy_runtime_api::client::identity::{
Identity, IdentityCachePartition, IdentityFuture, ResolveIdentity,
};
use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
use aws_smithy_types::config_bag::{ConfigBag, Storable, StoreReplace};
use std::sync::Arc;
Expand Down Expand Up @@ -124,15 +126,15 @@ impl ProvideCredentials for Arc<dyn ProvideCredentials> {
/// Newtype wrapper around ProvideCredentials that implements Clone using an internal
/// Arc.
#[derive(Clone, Debug)]
pub struct SharedCredentialsProvider(Arc<dyn ProvideCredentials>);
pub struct SharedCredentialsProvider(Arc<dyn ProvideCredentials>, IdentityCachePartition);

impl SharedCredentialsProvider {
/// Create a new SharedCredentials provider from `ProvideCredentials`
///
/// The given provider will be wrapped in an internal `Arc`. If your
/// provider is already in an `Arc`, use `SharedCredentialsProvider::from(provider)` instead.
pub fn new(provider: impl ProvideCredentials + 'static) -> Self {
Self(Arc::new(provider))
Self(Arc::new(provider), IdentityCachePartition::new())
}
}

Expand All @@ -144,7 +146,7 @@ impl AsRef<dyn ProvideCredentials> for SharedCredentialsProvider {

impl From<Arc<dyn ProvideCredentials>> for SharedCredentialsProvider {
fn from(provider: Arc<dyn ProvideCredentials>) -> Self {
SharedCredentialsProvider(provider)
SharedCredentialsProvider(provider, IdentityCachePartition::new())
}
}

Expand Down Expand Up @@ -173,4 +175,28 @@ impl ResolveIdentity for SharedCredentialsProvider {
fn fallback_on_interrupt(&self) -> Option<Identity> {
ProvideCredentials::fallback_on_interrupt(self).map(|creds| creds.into())
}

fn cache_partition(&self) -> Option<IdentityCachePartition> {
Some(self.1)
}
}

#[cfg(test)]
mod tests {
use aws_smithy_runtime_api::client::identity::SharedIdentityResolver;

use super::*;

#[test]
fn reuses_cache_partition() {
let creds = Credentials::new("AKID", "SECRET", None, None, "test");
let provider = SharedCredentialsProvider::new(creds);
let partition = provider.cache_partition();
assert!(partition.is_some());

let identity_resolver = SharedIdentityResolver::new(provider);
let identity_partition = identity_resolver.cache_partition();

assert!(partition.unwrap() == identity_partition);
}
}
32 changes: 28 additions & 4 deletions aws/rust-runtime/aws-credential-types/src/provider/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use crate::{provider::error::TokenError, provider::future, Token};
use aws_smithy_runtime_api::client::{
identity::{IdentityFuture, ResolveIdentity},
identity::{IdentityCachePartition, IdentityFuture, ResolveIdentity},
runtime_components::RuntimeComponents,
};
use aws_smithy_runtime_api::impl_shared_conversions;
Expand Down Expand Up @@ -45,15 +45,15 @@ impl ProvideToken for Token {
///
/// Newtype wrapper around [`ProvideToken`] that implements `Clone` using an internal `Arc`.
#[derive(Clone, Debug)]
pub struct SharedTokenProvider(Arc<dyn ProvideToken>);
pub struct SharedTokenProvider(Arc<dyn ProvideToken>, IdentityCachePartition);

impl SharedTokenProvider {
/// Create a new [`SharedTokenProvider`] from [`ProvideToken`].
///
/// The given provider will be wrapped in an internal `Arc`. If your
/// provider is already in an `Arc`, use `SharedTokenProvider::from(provider)` instead.
pub fn new(provider: impl ProvideToken + 'static) -> Self {
Self(Arc::new(provider))
Self(Arc::new(provider), IdentityCachePartition::new())
}
}

Expand All @@ -65,7 +65,7 @@ impl AsRef<dyn ProvideToken> for SharedTokenProvider {

impl From<Arc<dyn ProvideToken>> for SharedTokenProvider {
fn from(provider: Arc<dyn ProvideToken>) -> Self {
SharedTokenProvider(provider)
SharedTokenProvider(provider, IdentityCachePartition::new())
}
}

Expand All @@ -86,6 +86,30 @@ impl ResolveIdentity for SharedTokenProvider {
) -> IdentityFuture<'a> {
IdentityFuture::new(async move { Ok(self.provide_token().await?.into()) })
}

fn cache_partition(&self) -> Option<IdentityCachePartition> {
Some(self.1)
}
}

impl_shared_conversions!(convert SharedTokenProvider from ProvideToken using SharedTokenProvider::new);

#[cfg(test)]
mod tests {
use aws_smithy_runtime_api::client::identity::SharedIdentityResolver;

use super::*;

#[test]
fn reuses_cache_partition() {
let token = Token::new("token", None);
let provider = SharedTokenProvider::new(token);
let partition = provider.cache_partition();
assert!(partition.is_some());

let identity_resolver = SharedIdentityResolver::new(provider);
let identity_partition = identity_resolver.cache_partition();

assert!(partition.unwrap() == identity_partition);
}
}
119 changes: 119 additions & 0 deletions aws/sdk/integration-tests/s3/tests/identity-cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::Arc;

use aws_config::{identity::IdentityCache, BehaviorVersion, Region};
use aws_credential_types::{
provider::{future::ProvideCredentials as ProvideCredentialsFuture, ProvideCredentials},
Credentials,
};
use aws_sdk_s3::Client;
use aws_smithy_runtime::client::http::test_util::infallible_client_fn;

// NOTE: These tests are _not_ S3 specific and would apply to any AWS SDK but due to the need to consume `aws-config`
// (which depends on relocated runtime crates) we can't make this an `awsSdkIntegrationTest(..)`.

#[tokio::test]
async fn test_identity_cache_reused_by_default() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an integration test is service agnostic, it's a good practice we write it in Kotlin. Since the test uses credentials_provider, you can use awsSdkIntegrationTest (example).

Copy link
Contributor Author

@aajtodd aajtodd Apr 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this end up on every generated client? If so is that what we want here?

It does use credentials_provider but with fake/test credentials. It really is just testing the cache behavior and interaction between SdkConfig and generated client config. It doesn't make a real request.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it does. awsSdkIntegrationTest is a test helper that tests codegen itself but does not run against generated AWS SDKs.

It really is just testing the cache behavior and interaction between SdkConfig and generated client config. It doesn't make a real request.

That does sound like service-agnostic, and that's what awsSdkIntegrationTest is for

Copy link
Contributor

@ysaito1001 ysaito1001 Apr 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just throwing one last thing on the table before leaving tests in integration-tests/s3, can we directly construct SdkConfig in tests with necessary fields without going through aws_config::defaults? I'm assuming the tests won't need default configs in order to verify what they are supposed to test? If we don't use aws-config, then we may be able to avoid the problem we discussed offline.

But I am ok with what we have, and wanted to double-check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can which would allow us to test some of this but not how different behavior versions behave in loading (which still require a subsequent PR to fully implement) if I understand correctly how it's all wired.

let http_client =
infallible_client_fn(|_req| http::Response::builder().status(200).body("OK!").unwrap());

let provider = TestCredProvider::new();
let cache = IdentityCache::lazy().build();
let config = aws_config::defaults(BehaviorVersion::latest())
.http_client(http_client)
.credentials_provider(provider.clone())
// TODO(rfc-43) - remove adding a cache when this is the new default
.identity_cache(cache)
.region(Region::new("us-west-2"))
.load()
.await;

let c1 = Client::new(&config);
let _ = c1.list_buckets().send().await;
assert_eq!(1, provider.invoke_count.load(Ordering::SeqCst));

let c2 = Client::new(&config);
let _ = c2.list_buckets().send().await;
assert_eq!(1, provider.invoke_count.load(Ordering::SeqCst));
}

// TODO(rfc-43) - add no_identity_cache() to ConfigLoader and re-enable test
// #[tokio::test]
// async fn test_identity_cache_explicit_unset() {
// let http_client =
// infallible_client_fn(|_req| http::Response::builder().status(200).body("OK!").unwrap());
//
// let provider = TestCredProvider::new();
//
// let config = aws_config::defaults(BehaviorVersion::latest())
// .no_identity_cache()
// .http_client(http_client)
// .credentials_provider(provider.clone())
// .region(Region::new("us-west-2"))
// .load()
// .await;
//
// let c1 = Client::new(&config);
// let _ = c1.list_buckets().send().await;
// assert_eq!(1, provider.invoke_count.load(Ordering::SeqCst));
//
// let c2 = Client::new(&config);
// let _ = c2.list_buckets().send().await;
// assert_eq!(2, provider.invoke_count.load(Ordering::SeqCst));
// }

#[tokio::test]
async fn test_identity_cache_ga_behavior_version() {
let http_client =
infallible_client_fn(|_req| http::Response::builder().status(200).body("OK!").unwrap());

let provider = TestCredProvider::new();

// no cache is defined in this behavior version by default so each client should get their own
let config = aws_config::defaults(BehaviorVersion::v2023_11_09())
.http_client(http_client)
.credentials_provider(provider.clone())
.region(Region::new("us-west-2"))
.load()
.await;

let c1 = Client::new(&config);
let _ = c1.list_buckets().send().await;
assert_eq!(1, provider.invoke_count.load(Ordering::SeqCst));

let c2 = Client::new(&config);
let _ = c2.list_buckets().send().await;
assert_eq!(2, provider.invoke_count.load(Ordering::SeqCst));
}

#[derive(Clone, Debug)]
struct TestCredProvider {
invoke_count: Arc<AtomicI32>,
creds: Credentials,
}

impl TestCredProvider {
fn new() -> Self {
TestCredProvider {
invoke_count: Arc::new(AtomicI32::default()),
creds: Credentials::for_tests(),
}
}
}

impl ProvideCredentials for TestCredProvider {
fn provide_credentials<'a>(
&'a self,
) -> aws_credential_types::provider::future::ProvideCredentials<'a>
where
Self: 'a,
{
self.invoke_count.fetch_add(1, Ordering::SeqCst);
ProvideCredentialsFuture::ready(Ok(self.creds.clone()))
}
}
20 changes: 19 additions & 1 deletion rust-runtime/aws-smithy-runtime-api/src/client/identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,13 @@ pub trait ResolveIdentity: Send + Sync + Debug {
fn cache_location(&self) -> IdentityCacheLocation {
IdentityCacheLocation::RuntimeComponents
}

/// Returns the identity cache partition associated with this identity resolver.
///
/// By default this returns `None` and cache partitioning is left up to `SharedIdentityResolver`.
fn cache_partition(&self) -> Option<IdentityCachePartition> {
None
}
}

/// Cache location for identity caching.
Expand Down Expand Up @@ -195,9 +202,16 @@ pub struct SharedIdentityResolver {
impl SharedIdentityResolver {
/// Creates a new [`SharedIdentityResolver`] from the given resolver.
pub fn new(resolver: impl ResolveIdentity + 'static) -> Self {
// NOTE: `IdentityCachePartition` is globally unique by construction so even
// custom implementations of `ResolveIdentity::cache_partition()` are unique.
let partition = match resolver.cache_partition() {
Some(p) => p,
None => IdentityCachePartition::new(),
};

Self {
inner: Arc::new(resolver),
cache_partition: IdentityCachePartition::new(),
cache_partition: partition,
}
}

Expand All @@ -222,6 +236,10 @@ impl ResolveIdentity for SharedIdentityResolver {
fn cache_location(&self) -> IdentityCacheLocation {
self.inner.cache_location()
}

fn cache_partition(&self) -> Option<IdentityCachePartition> {
Some(self.cache_partition())
}
ysaito1001 marked this conversation as resolved.
Show resolved Hide resolved
}

impl_shared_conversions!(convert SharedIdentityResolver from ResolveIdentity using SharedIdentityResolver::new);
Expand Down