Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

memoize s3 client #5377

Merged
merged 5 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ fn get_region(s3_storage_config: &S3StorageConfig) -> Option<Region> {
})
}

async fn create_s3_client(s3_storage_config: &S3StorageConfig) -> S3Client {
pub async fn create_s3_client(s3_storage_config: &S3StorageConfig) -> S3Client {
let aws_config = get_aws_config().await;
let credentials_provider =
get_credentials_provider(s3_storage_config).or(aws_config.credentials_provider());
Expand Down Expand Up @@ -155,41 +155,40 @@ async fn create_s3_client(s3_storage_config: &S3StorageConfig) -> S3Client {
}

impl S3CompatibleObjectStorage {
/// Creates an object storage given a region and a bucket name.
pub async fn new(
/// Creates an object storage given a region and an uri.
pub async fn from_uri(
s3_storage_config: &S3StorageConfig,
uri: Uri,
bucket: String,
uri: &Uri,
) -> Result<Self, StorageResolverError> {
let s3_client = create_s3_client(s3_storage_config).await;
Self::from_uri_and_client(s3_storage_config, uri, s3_client).await
}

/// Creates an object storage given a region, an uri and an S3 client.
pub async fn from_uri_and_client(
s3_storage_config: &S3StorageConfig,
uri: &Uri,
s3_client: S3Client,
) -> Result<Self, StorageResolverError> {
let (bucket, prefix) = parse_s3_uri(uri).ok_or_else(|| {
let message = format!("failed to extract bucket name from S3 URI: {uri}");
StorageResolverError::InvalidUri(message)
})?;
let retry_params = RetryParams::aggressive();
let disable_multi_object_delete = s3_storage_config.disable_multi_object_delete;
let disable_multipart_upload = s3_storage_config.disable_multipart_upload;
Ok(Self {
s3_client,
uri,
uri: uri.clone(),
bucket,
prefix: PathBuf::new(),
prefix,
multipart_policy: MultiPartPolicy::default(),
retry_params,
disable_multi_object_delete,
disable_multipart_upload,
})
}

/// Creates an object storage given a region and an uri.
pub async fn from_uri(
s3_storage_config: &S3StorageConfig,
uri: &Uri,
) -> Result<Self, StorageResolverError> {
let (bucket, prefix) = parse_s3_uri(uri).ok_or_else(|| {
let message = format!("failed to extract bucket name from S3 URI: {uri}");
StorageResolverError::InvalidUri(message)
})?;
let storage = Self::new(s3_storage_config, uri.clone(), bucket).await?;
Ok(storage.with_prefix(prefix))
}

/// Sets a specific for all buckets.
///
/// This method overrides any existing prefix. (It does NOT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,34 @@
use std::sync::Arc;

use async_trait::async_trait;
use aws_sdk_s3::Client as S3Client;
use quickwit_common::uri::Uri;
use quickwit_config::{S3StorageConfig, StorageBackend};
use tokio::sync::OnceCell;

use super::s3_compatible_storage::create_s3_client;
use crate::{
DebouncedStorage, S3CompatibleObjectStorage, Storage, StorageFactory, StorageResolverError,
};

/// S3 compatible object storage resolver.
pub struct S3CompatibleObjectStorageFactory {
storage_config: S3StorageConfig,
// we cache the S3Client so we don't rebuild one every time we build a new Storage (for
// every search query).
// We don't build it in advance because we don't know if this factory is one that will
// end up being used, or if something like azure, gcs, or even local files, will be used
// instead.
s3_client: OnceCell<S3Client>,
}

impl S3CompatibleObjectStorageFactory {
/// Creates a new S3-compatible storage factory.
pub fn new(storage_config: S3StorageConfig) -> Self {
Self { storage_config }
Self {
storage_config,
s3_client: OnceCell::new(),
}
}
}

Expand All @@ -46,7 +58,14 @@ impl StorageFactory for S3CompatibleObjectStorageFactory {
}

async fn resolve(&self, uri: &Uri) -> Result<Arc<dyn Storage>, StorageResolverError> {
let storage = S3CompatibleObjectStorage::from_uri(&self.storage_config, uri).await?;
let s3_client = self
.s3_client
.get_or_init(|| create_s3_client(&self.storage_config))
.await
.clone();
let storage =
S3CompatibleObjectStorage::from_uri_and_client(&self.storage_config, uri, s3_client)
.await?;
Ok(Arc::new(DebouncedStorage::new(storage)))
}
}
Loading