From 68207c4ccff6a31b4ce96ab498a49d371ceaaa9e Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Tue, 3 Sep 2024 16:30:48 +0200 Subject: [PATCH 1/3] cache S3Client in factory --- .../object_storage/s3_compatible_storage.rs | 18 +++++++++++---- .../s3_compatible_storage_resolver.rs | 22 ++++++++++++++++--- 2 files changed, 33 insertions(+), 7 deletions(-) diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs index 335747fe638..165f6bb1f94 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs @@ -127,7 +127,7 @@ fn get_region(s3_storage_config: &S3StorageConfig) -> Option { }) } -async fn create_s3_client(s3_storage_config: &S3StorageConfig) -> S3Client { +pub(crate) async fn create_s3_client(s3_storage_config: &S3StorageConfig) -> S3Client { let aws_config = get_aws_config().await; let credentials_provider = get_credentials_provider(s3_storage_config).or(aws_config.credentials_provider()); @@ -155,13 +155,13 @@ async fn create_s3_client(s3_storage_config: &S3StorageConfig) -> S3Client { } impl S3CompatibleObjectStorage { - /// Creates an object storage given a region and a bucket name. + /// Creates an object storage given a region, a bucket name and an S3 client. pub async fn new( s3_storage_config: &S3StorageConfig, uri: Uri, bucket: String, + s3_client: S3Client, ) -> Result { - let s3_client = create_s3_client(s3_storage_config).await; let retry_params = RetryParams::aggressive(); let disable_multi_object_delete = s3_storage_config.disable_multi_object_delete; let disable_multipart_upload = s3_storage_config.disable_multipart_upload; @@ -181,12 +181,22 @@ impl S3CompatibleObjectStorage { pub async fn from_uri( s3_storage_config: &S3StorageConfig, uri: &Uri, + ) -> Result { + let s3_client = create_s3_client(s3_storage_config).await; + Self::from_uri_and_client(s3_storage_config, uri, s3_client).await + } + + /// Creates an object storage given a region, an uri and an S3 client. + pub async fn from_uri_and_client( + s3_storage_config: &S3StorageConfig, + uri: &Uri, + s3_client: S3Client, ) -> Result { let (bucket, prefix) = parse_s3_uri(uri).ok_or_else(|| { let message = format!("failed to extract bucket name from S3 URI: {uri}"); StorageResolverError::InvalidUri(message) })?; - let storage = Self::new(s3_storage_config, uri.clone(), bucket).await?; + let storage = Self::new(s3_storage_config, uri.clone(), bucket, s3_client).await?; Ok(storage.with_prefix(prefix)) } diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage_resolver.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage_resolver.rs index 580f4bf89c4..7f9f1804259 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage_resolver.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage_resolver.rs @@ -17,12 +17,14 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use async_trait::async_trait; +use aws_sdk_s3::Client as S3Client; use quickwit_common::uri::Uri; use quickwit_config::{S3StorageConfig, StorageBackend}; +use super::s3_compatible_storage::create_s3_client; use crate::{ DebouncedStorage, S3CompatibleObjectStorage, Storage, StorageFactory, StorageResolverError, }; @@ -30,12 +32,16 @@ use crate::{ /// S3 compatible object storage resolver. pub struct S3CompatibleObjectStorageFactory { storage_config: S3StorageConfig, + s3_client: Mutex>, } impl S3CompatibleObjectStorageFactory { /// Creates a new S3-compatible storage factory. pub fn new(storage_config: S3StorageConfig) -> Self { - Self { storage_config } + Self { + storage_config, + s3_client: Mutex::new(None), + } } } @@ -46,7 +52,17 @@ impl StorageFactory for S3CompatibleObjectStorageFactory { } async fn resolve(&self, uri: &Uri) -> Result, StorageResolverError> { - let storage = S3CompatibleObjectStorage::from_uri(&self.storage_config, uri).await?; + let s3_client_opt = self.s3_client.lock().unwrap().clone(); + let s3_client = if let Some(s3_client) = s3_client_opt { + s3_client + } else { + let s3_client = create_s3_client(&self.storage_config).await; + *self.s3_client.lock().unwrap() = Some(s3_client.clone()); + s3_client + }; + let storage = + S3CompatibleObjectStorage::from_uri_and_client(&self.storage_config, uri, s3_client) + .await?; Ok(Arc::new(DebouncedStorage::new(storage))) } } From 25ef787428a7b5bf35c87f580475bcde8947b623 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Wed, 4 Sep 2024 12:04:59 +0200 Subject: [PATCH 2/3] use tokio oncecell instead of mutex --- .../s3_compatible_storage_resolver.rs | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage_resolver.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage_resolver.rs index 7f9f1804259..cc5d37253ec 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage_resolver.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage_resolver.rs @@ -17,12 +17,13 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use async_trait::async_trait; use aws_sdk_s3::Client as S3Client; use quickwit_common::uri::Uri; use quickwit_config::{S3StorageConfig, StorageBackend}; +use tokio::sync::OnceCell; use super::s3_compatible_storage::create_s3_client; use crate::{ @@ -32,7 +33,11 @@ use crate::{ /// S3 compatible object storage resolver. pub struct S3CompatibleObjectStorageFactory { storage_config: S3StorageConfig, - s3_client: Mutex>, + // we cache the S3Client so we don't rebuild one every time we need to connect to S3. + // We don't build it in advance because we don't know if this factory is one that will + // end up being used, or if something like azure, gcs, or even local files, will be used + // instead. + s3_client: OnceCell, } impl S3CompatibleObjectStorageFactory { @@ -40,7 +45,7 @@ impl S3CompatibleObjectStorageFactory { pub fn new(storage_config: S3StorageConfig) -> Self { Self { storage_config, - s3_client: Mutex::new(None), + s3_client: OnceCell::new(), } } } @@ -52,14 +57,11 @@ impl StorageFactory for S3CompatibleObjectStorageFactory { } async fn resolve(&self, uri: &Uri) -> Result, StorageResolverError> { - let s3_client_opt = self.s3_client.lock().unwrap().clone(); - let s3_client = if let Some(s3_client) = s3_client_opt { - s3_client - } else { - let s3_client = create_s3_client(&self.storage_config).await; - *self.s3_client.lock().unwrap() = Some(s3_client.clone()); - s3_client - }; + let s3_client = self + .s3_client + .get_or_init(|| create_s3_client(&self.storage_config)) + .await + .clone(); let storage = S3CompatibleObjectStorage::from_uri_and_client(&self.storage_config, uri, s3_client) .await?; From d02d239d86878c6d2e4e5020d338b9e973373d50 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Wed, 4 Sep 2024 13:51:10 +0200 Subject: [PATCH 3/3] remove unused constructor for s3 storage --- .../object_storage/s3_compatible_storage.rs | 39 +++++++------------ .../s3_compatible_storage_resolver.rs | 3 +- 2 files changed, 16 insertions(+), 26 deletions(-) diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs index 165f6bb1f94..95a030e560f 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs @@ -127,7 +127,7 @@ fn get_region(s3_storage_config: &S3StorageConfig) -> Option { }) } -pub(crate) async fn create_s3_client(s3_storage_config: &S3StorageConfig) -> S3Client { +pub async fn create_s3_client(s3_storage_config: &S3StorageConfig) -> S3Client { let aws_config = get_aws_config().await; let credentials_provider = get_credentials_provider(s3_storage_config).or(aws_config.credentials_provider()); @@ -155,28 +155,6 @@ pub(crate) async fn create_s3_client(s3_storage_config: &S3StorageConfig) -> S3C } impl S3CompatibleObjectStorage { - /// Creates an object storage given a region, a bucket name and an S3 client. - pub async fn new( - s3_storage_config: &S3StorageConfig, - uri: Uri, - bucket: String, - s3_client: S3Client, - ) -> Result { - let retry_params = RetryParams::aggressive(); - let disable_multi_object_delete = s3_storage_config.disable_multi_object_delete; - let disable_multipart_upload = s3_storage_config.disable_multipart_upload; - Ok(Self { - s3_client, - uri, - bucket, - prefix: PathBuf::new(), - multipart_policy: MultiPartPolicy::default(), - retry_params, - disable_multi_object_delete, - disable_multipart_upload, - }) - } - /// Creates an object storage given a region and an uri. pub async fn from_uri( s3_storage_config: &S3StorageConfig, @@ -196,8 +174,19 @@ impl S3CompatibleObjectStorage { let message = format!("failed to extract bucket name from S3 URI: {uri}"); StorageResolverError::InvalidUri(message) })?; - let storage = Self::new(s3_storage_config, uri.clone(), bucket, s3_client).await?; - Ok(storage.with_prefix(prefix)) + let retry_params = RetryParams::aggressive(); + let disable_multi_object_delete = s3_storage_config.disable_multi_object_delete; + let disable_multipart_upload = s3_storage_config.disable_multipart_upload; + Ok(Self { + s3_client, + uri: uri.clone(), + bucket, + prefix, + multipart_policy: MultiPartPolicy::default(), + retry_params, + disable_multi_object_delete, + disable_multipart_upload, + }) } /// Sets a specific for all buckets. diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage_resolver.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage_resolver.rs index cc5d37253ec..ae746f0a26b 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage_resolver.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage_resolver.rs @@ -33,7 +33,8 @@ use crate::{ /// S3 compatible object storage resolver. pub struct S3CompatibleObjectStorageFactory { storage_config: S3StorageConfig, - // we cache the S3Client so we don't rebuild one every time we need to connect to S3. + // we cache the S3Client so we don't rebuild one every time we build a new Storage (for + // every search query). // We don't build it in advance because we don't know if this factory is one that will // end up being used, or if something like azure, gcs, or even local files, will be used // instead.