Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

memoize s3 client #5377

Merged
merged 5 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ fn get_region(s3_storage_config: &S3StorageConfig) -> Option<Region> {
})
}

async fn create_s3_client(s3_storage_config: &S3StorageConfig) -> S3Client {
pub(crate) async fn create_s3_client(s3_storage_config: &S3StorageConfig) -> S3Client {
let aws_config = get_aws_config().await;
let credentials_provider =
get_credentials_provider(s3_storage_config).or(aws_config.credentials_provider());
Expand Down Expand Up @@ -155,13 +155,13 @@ async fn create_s3_client(s3_storage_config: &S3StorageConfig) -> S3Client {
}

impl S3CompatibleObjectStorage {
/// Creates an object storage given a region and a bucket name.
/// Creates an object storage given a region, a bucket name and an S3 client.
pub async fn new(
s3_storage_config: &S3StorageConfig,
uri: Uri,
bucket: String,
s3_client: S3Client,
) -> Result<Self, StorageResolverError> {
let s3_client = create_s3_client(s3_storage_config).await;
let retry_params = RetryParams::aggressive();
let disable_multi_object_delete = s3_storage_config.disable_multi_object_delete;
let disable_multipart_upload = s3_storage_config.disable_multipart_upload;
Expand All @@ -181,12 +181,22 @@ impl S3CompatibleObjectStorage {
pub async fn from_uri(
s3_storage_config: &S3StorageConfig,
uri: &Uri,
) -> Result<Self, StorageResolverError> {
let s3_client = create_s3_client(s3_storage_config).await;
Self::from_uri_and_client(s3_storage_config, uri, s3_client).await
}

/// Creates an object storage given a region, an uri and an S3 client.
pub async fn from_uri_and_client(
rdettai marked this conversation as resolved.
Show resolved Hide resolved
s3_storage_config: &S3StorageConfig,
uri: &Uri,
s3_client: S3Client,
) -> Result<Self, StorageResolverError> {
let (bucket, prefix) = parse_s3_uri(uri).ok_or_else(|| {
let message = format!("failed to extract bucket name from S3 URI: {uri}");
StorageResolverError::InvalidUri(message)
})?;
let storage = Self::new(s3_storage_config, uri.clone(), bucket).await?;
let storage = Self::new(s3_storage_config, uri.clone(), bucket, s3_client).await?;
Ok(storage.with_prefix(prefix))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,31 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::sync::Arc;
use std::sync::{Arc, Mutex};

use async_trait::async_trait;
use aws_sdk_s3::Client as S3Client;
use quickwit_common::uri::Uri;
use quickwit_config::{S3StorageConfig, StorageBackend};

use super::s3_compatible_storage::create_s3_client;
use crate::{
DebouncedStorage, S3CompatibleObjectStorage, Storage, StorageFactory, StorageResolverError,
};

/// S3 compatible object storage resolver.
pub struct S3CompatibleObjectStorageFactory {
storage_config: S3StorageConfig,
s3_client: Mutex<Option<S3Client>>,
rdettai marked this conversation as resolved.
Show resolved Hide resolved
}

impl S3CompatibleObjectStorageFactory {
/// Creates a new S3-compatible storage factory.
pub fn new(storage_config: S3StorageConfig) -> Self {
Self { storage_config }
Self {
storage_config,
s3_client: Mutex::new(None),
}
}
}

Expand All @@ -46,7 +52,17 @@ impl StorageFactory for S3CompatibleObjectStorageFactory {
}

async fn resolve(&self, uri: &Uri) -> Result<Arc<dyn Storage>, StorageResolverError> {
let storage = S3CompatibleObjectStorage::from_uri(&self.storage_config, uri).await?;
let s3_client_opt = self.s3_client.lock().unwrap().clone();
let s3_client = if let Some(s3_client) = s3_client_opt {
s3_client
} else {
let s3_client = create_s3_client(&self.storage_config).await;
*self.s3_client.lock().unwrap() = Some(s3_client.clone());
s3_client
};
let storage =
S3CompatibleObjectStorage::from_uri_and_client(&self.storage_config, uri, s3_client)
.await?;
Ok(Arc::new(DebouncedStorage::new(storage)))
}
}
Loading