Skip to content

Commit

Permalink
cache S3Client in factory
Browse files Browse the repository at this point in the history
  • Loading branch information
trinity-1686a committed Sep 3, 2024
1 parent fc5ef16 commit 68207c4
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ fn get_region(s3_storage_config: &S3StorageConfig) -> Option<Region> {
})
}

async fn create_s3_client(s3_storage_config: &S3StorageConfig) -> S3Client {
pub(crate) async fn create_s3_client(s3_storage_config: &S3StorageConfig) -> S3Client {
let aws_config = get_aws_config().await;
let credentials_provider =
get_credentials_provider(s3_storage_config).or(aws_config.credentials_provider());
Expand Down Expand Up @@ -155,13 +155,13 @@ async fn create_s3_client(s3_storage_config: &S3StorageConfig) -> S3Client {
}

impl S3CompatibleObjectStorage {
/// Creates an object storage given a region and a bucket name.
/// Creates an object storage given a region, a bucket name and an S3 client.
pub async fn new(
s3_storage_config: &S3StorageConfig,
uri: Uri,
bucket: String,
s3_client: S3Client,
) -> Result<Self, StorageResolverError> {
let s3_client = create_s3_client(s3_storage_config).await;
let retry_params = RetryParams::aggressive();
let disable_multi_object_delete = s3_storage_config.disable_multi_object_delete;
let disable_multipart_upload = s3_storage_config.disable_multipart_upload;
Expand All @@ -181,12 +181,22 @@ impl S3CompatibleObjectStorage {
pub async fn from_uri(
s3_storage_config: &S3StorageConfig,
uri: &Uri,
) -> Result<Self, StorageResolverError> {
let s3_client = create_s3_client(s3_storage_config).await;
Self::from_uri_and_client(s3_storage_config, uri, s3_client).await
}

/// Creates an object storage given a region, an uri and an S3 client.
pub async fn from_uri_and_client(
s3_storage_config: &S3StorageConfig,
uri: &Uri,
s3_client: S3Client,
) -> Result<Self, StorageResolverError> {
let (bucket, prefix) = parse_s3_uri(uri).ok_or_else(|| {
let message = format!("failed to extract bucket name from S3 URI: {uri}");
StorageResolverError::InvalidUri(message)
})?;
let storage = Self::new(s3_storage_config, uri.clone(), bucket).await?;
let storage = Self::new(s3_storage_config, uri.clone(), bucket, s3_client).await?;
Ok(storage.with_prefix(prefix))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,31 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::sync::Arc;
use std::sync::{Arc, Mutex};

use async_trait::async_trait;
use aws_sdk_s3::Client as S3Client;
use quickwit_common::uri::Uri;
use quickwit_config::{S3StorageConfig, StorageBackend};

use super::s3_compatible_storage::create_s3_client;
use crate::{
DebouncedStorage, S3CompatibleObjectStorage, Storage, StorageFactory, StorageResolverError,
};

/// S3 compatible object storage resolver.
pub struct S3CompatibleObjectStorageFactory {
storage_config: S3StorageConfig,
s3_client: Mutex<Option<S3Client>>,
}

impl S3CompatibleObjectStorageFactory {
/// Creates a new S3-compatible storage factory.
pub fn new(storage_config: S3StorageConfig) -> Self {
Self { storage_config }
Self {
storage_config,
s3_client: Mutex::new(None),
}
}
}

Expand All @@ -46,7 +52,17 @@ impl StorageFactory for S3CompatibleObjectStorageFactory {
}

async fn resolve(&self, uri: &Uri) -> Result<Arc<dyn Storage>, StorageResolverError> {
let storage = S3CompatibleObjectStorage::from_uri(&self.storage_config, uri).await?;
let s3_client_opt = self.s3_client.lock().unwrap().clone();
let s3_client = if let Some(s3_client) = s3_client_opt {
s3_client
} else {
let s3_client = create_s3_client(&self.storage_config).await;
*self.s3_client.lock().unwrap() = Some(s3_client.clone());
s3_client
};
let storage =
S3CompatibleObjectStorage::from_uri_and_client(&self.storage_config, uri, s3_client)
.await?;
Ok(Arc::new(DebouncedStorage::new(storage)))
}
}

0 comments on commit 68207c4

Please sign in to comment.