From d207e7277f73a7d0cef38f06a3ae60bec4898102 Mon Sep 17 00:00:00 2001 From: Paul Masurel <paul@quickwit.io> Date: Thu, 17 Oct 2024 15:04:33 +0900 Subject: [PATCH] Optionally prepending s3 compatible storage's key with a hash of the key. The point is to workaround S3 rate limiting. Since it is based on keys, our ULID naming scheme can lead to hotspot in the keyspace. This solution has a downside. External scripts listing files will have a their job multiplied. For this reason, the prefix cardinality is configurable. Closes #4824 --- quickwit/Cargo.lock | 1 + quickwit/Cargo.toml | 1 + .../quickwit-config/src/storage_config.rs | 37 +++++++- quickwit/quickwit-storage/Cargo.toml | 1 + .../object_storage/s3_compatible_storage.rs | 91 +++++++++++++++---- 5 files changed, 113 insertions(+), 18 deletions(-) diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 721160f1ed4..735dc321972 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -6577,6 +6577,7 @@ dependencies = [ "lru", "md5", "mockall", + "murmurhash32", "once_cell", "opendal", "pin-project", diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index 6fcf67c2e10..4e34668f3a0 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -152,6 +152,7 @@ matches = "0.1.9" md5 = "0.7" mime_guess = "2.0.4" mockall = "0.11" +murmurhash32 = "0.3" mrecordlog = { git = "https://github.com/quickwit-oss/mrecordlog", rev = "306c0a7" } new_string_template = "1.5.1" nom = "7.1.3" diff --git a/quickwit/quickwit-config/src/storage_config.rs b/quickwit/quickwit-config/src/storage_config.rs index 651271d0c61..9c50e55af48 100644 --- a/quickwit/quickwit-config/src/storage_config.rs +++ b/quickwit/quickwit-config/src/storage_config.rs @@ -26,6 +26,7 @@ use itertools::Itertools; use quickwit_common::get_bool_from_env; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, EnumMap}; +use tracing::warn; /// Lists the storage backends supported by Quickwit. #[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)] @@ -93,6 +94,9 @@ impl StorageConfigs { } pub fn validate(&self) -> anyhow::Result<()> { + for storage_config in self.0.iter() { + storage_config.validate()?; + } let backends: Vec<StorageBackend> = self .0 .iter() @@ -216,6 +220,14 @@ impl StorageConfig { _ => None, } } + + pub fn validate(&self) -> anyhow::Result<()> { + if let StorageConfig::S3(config) = self { + config.validate() + } else { + Ok(()) + } + } } impl From<AzureStorageConfig> for StorageConfig { @@ -313,6 +325,8 @@ impl fmt::Debug for AzureStorageConfig { } } +const MAX_S3_HASH_PREFIX_CARDINALITY: usize = 16usize.pow(3); + #[derive(Clone, Default, Eq, PartialEq, Serialize, Deserialize)] #[serde(deny_unknown_fields)] pub struct S3StorageConfig { @@ -334,9 +348,30 @@ pub struct S3StorageConfig { pub disable_multi_object_delete: bool, #[serde(default)] pub disable_multipart_upload: bool, + #[serde(default)] + #[serde(skip_serializing_if = "lower_than_2")] + pub hash_prefix_cardinality: usize, +} + +fn lower_than_2(n: &usize) -> bool { + *n < 2 } impl S3StorageConfig { + fn validate(&self) -> anyhow::Result<()> { + if self.hash_prefix_cardinality == 1 { + warn!("A hash prefix of 1 will be ignored"); + } + if self.hash_prefix_cardinality > MAX_S3_HASH_PREFIX_CARDINALITY { + anyhow::bail!( + "hash_prefix_cardinality can take values of at most \ + {MAX_S3_HASH_PREFIX_CARDINALITY}, currently set to {}", + self.hash_prefix_cardinality + ); + } + Ok(()) + } + fn apply_flavor(&mut self) { match self.flavor { Some(StorageBackendFlavor::DigitalOcean) => { @@ -383,7 +418,7 @@ impl S3StorageConfig { } impl fmt::Debug for S3StorageConfig { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("S3StorageConfig") .field("access_key_id", &self.access_key_id) .field( diff --git a/quickwit/quickwit-storage/Cargo.toml b/quickwit/quickwit-storage/Cargo.toml index c883c61c265..385ec152683 100644 --- a/quickwit/quickwit-storage/Cargo.toml +++ b/quickwit/quickwit-storage/Cargo.toml @@ -26,6 +26,7 @@ once_cell = { workspace = true } pin-project = { workspace = true } rand = { workspace = true } regex = { workspace = true } +murmurhash32 = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } tantivy = { workspace = true } diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs index 86ef692c671..215f09d2d64 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs @@ -86,11 +86,13 @@ pub struct S3CompatibleObjectStorage { s3_client: S3Client, uri: Uri, bucket: String, - prefix: PathBuf, + prefix: String, multipart_policy: MultiPartPolicy, retry_params: RetryParams, disable_multi_object_delete: bool, disable_multipart_upload: bool, + // If 0, we don't have any prefix + hash_prefix_cardinality: usize, } impl fmt::Debug for S3CompatibleObjectStorage { @@ -99,6 +101,7 @@ impl fmt::Debug for S3CompatibleObjectStorage { .debug_struct("S3CompatibleObjectStorage") .field("bucket", &self.bucket) .field("prefix", &self.prefix) + .field("hash_prefix_cardinality", &self.hash_prefix_cardinality) .finish() } } @@ -181,11 +184,12 @@ impl S3CompatibleObjectStorage { s3_client, uri: uri.clone(), bucket, - prefix, + prefix: prefix.to_string_lossy().to_string(), multipart_policy: MultiPartPolicy::default(), retry_params, disable_multi_object_delete, disable_multipart_upload, + hash_prefix_cardinality: s3_storage_config.hash_prefix_cardinality, }) } @@ -193,7 +197,7 @@ impl S3CompatibleObjectStorage { /// /// This method overrides any existing prefix. (It does NOT /// append the argument to any existing prefix.) - pub fn with_prefix(self, prefix: PathBuf) -> Self { + pub fn with_prefix(self, prefix: String) -> Self { Self { s3_client: self.s3_client, uri: self.uri, @@ -203,6 +207,7 @@ impl S3CompatibleObjectStorage { retry_params: self.retry_params, disable_multi_object_delete: self.disable_multi_object_delete, disable_multipart_upload: self.disable_multipart_upload, + hash_prefix_cardinality: self.hash_prefix_cardinality, } } @@ -262,12 +267,49 @@ async fn compute_md5<T: AsyncRead + std::marker::Unpin>(mut read: T) -> io::Resu } } } +const HEX_ALPHABET: [u8; 16] = *b"0123456789abcdef"; +const UNINITIALIZED_HASH_PREFIX: &str = "00000000"; + +fn build_key(prefix: &str, relative_path: &str, hash_prefix_cardinality: usize) -> String { + let mut key = String::with_capacity( + UNINITIALIZED_HASH_PREFIX.len() + 1 + prefix.len() + 1 + relative_path.len(), + ); + if hash_prefix_cardinality > 1 { + key.push_str(UNINITIALIZED_HASH_PREFIX); + key.push('/'); + } + key.push_str(prefix); + if key.as_bytes().last().copied() != Some(b'/') { + key.push('/'); + } + key.push_str(relative_path); + // We then set up the prefix. + if hash_prefix_cardinality > 1 { + let key_without_prefix = &key.as_bytes()[UNINITIALIZED_HASH_PREFIX.len() + 1..]; + let mut prefix_hash: usize = + murmurhash32::murmurhash3(key_without_prefix) as usize % hash_prefix_cardinality; + unsafe { + let prefix_buf: &mut [u8] = &mut key.as_bytes_mut()[..UNINITIALIZED_HASH_PREFIX.len()]; + for prefix_byte in prefix_buf { + let hex: u8 = HEX_ALPHABET[(prefix_hash % 16) as usize]; + *prefix_byte = hex; + if prefix_hash < 16 { + break; + } + prefix_hash /= 16; + } + } + } + key +} impl S3CompatibleObjectStorage { fn key(&self, relative_path: &Path) -> String { - // FIXME: This may not work on Windows. - let key_path = self.prefix.join(relative_path); - key_path.to_string_lossy().to_string() + build_key( + &self.prefix, + relative_path.to_string_lossy().as_ref(), + self.hash_prefix_cardinality, + ) } fn relative_path(&self, key: &str) -> PathBuf { @@ -945,13 +987,13 @@ mod tests { let s3_client = S3Client::new(&sdk_config); let uri = Uri::for_test("s3://bucket/indexes"); let bucket = "bucket".to_string(); - let prefix = PathBuf::new(); let mut s3_storage = S3CompatibleObjectStorage { s3_client, uri, bucket, - prefix, + prefix: String::new(), + hash_prefix_cardinality: 0, multipart_policy: MultiPartPolicy::default(), retry_params: RetryParams::for_test(), disable_multi_object_delete: false, @@ -962,7 +1004,7 @@ mod tests { PathBuf::from("indexes/foo") ); - s3_storage.prefix = PathBuf::from("indexes"); + s3_storage.prefix = "indexes".to_string(); assert_eq!( s3_storage.relative_path("indexes/foo"), @@ -1000,13 +1042,13 @@ mod tests { let s3_client = S3Client::from_conf(config); let uri = Uri::for_test("s3://bucket/indexes"); let bucket = "bucket".to_string(); - let prefix = PathBuf::new(); let s3_storage = S3CompatibleObjectStorage { s3_client, uri, bucket, - prefix, + prefix: String::new(), + hash_prefix_cardinality: 0, multipart_policy: MultiPartPolicy::default(), retry_params: RetryParams::for_test(), disable_multi_object_delete: true, @@ -1041,13 +1083,13 @@ mod tests { let s3_client = S3Client::from_conf(config); let uri = Uri::for_test("s3://bucket/indexes"); let bucket = "bucket".to_string(); - let prefix = PathBuf::new(); let s3_storage = S3CompatibleObjectStorage { s3_client, uri, bucket, - prefix, + prefix: String::new(), + hash_prefix_cardinality: 0, multipart_policy: MultiPartPolicy::default(), retry_params: RetryParams::for_test(), disable_multi_object_delete: false, @@ -1123,13 +1165,13 @@ mod tests { let s3_client = S3Client::from_conf(config); let uri = Uri::for_test("s3://bucket/indexes"); let bucket = "bucket".to_string(); - let prefix = PathBuf::new(); let s3_storage = S3CompatibleObjectStorage { s3_client, uri, bucket, - prefix, + prefix: String::new(), + hash_prefix_cardinality: 0, multipart_policy: MultiPartPolicy::default(), retry_params: RetryParams::for_test(), disable_multi_object_delete: false, @@ -1216,13 +1258,13 @@ mod tests { let s3_client = S3Client::from_conf(config); let uri = Uri::for_test("s3://bucket/indexes"); let bucket = "bucket".to_string(); - let prefix = PathBuf::new(); let s3_storage = S3CompatibleObjectStorage { s3_client, uri, bucket, - prefix, + prefix: String::new(), + hash_prefix_cardinality: 0, multipart_policy: MultiPartPolicy::default(), retry_params: RetryParams::for_test(), disable_multi_object_delete: false, @@ -1233,4 +1275,19 @@ mod tests { .await .unwrap(); } + + #[test] + fn test_build_key() { + assert_eq!(build_key("hello", "coucou", 0), "hello/coucou"); + assert_eq!(build_key("hello/", "coucou", 0), "hello/coucou"); + assert_eq!(build_key("hello/", "coucou", 1), "hello/coucou"); + assert_eq!(build_key("hello", "coucou", 1), "hello/coucou"); + assert_eq!(build_key("hello/", "coucou", 2), "10000000/hello/coucou"); + assert_eq!(build_key("hello", "coucou", 2), "10000000/hello/coucou"); + assert_eq!(build_key("hello/", "coucou", 16), "d0000000/hello/coucou"); + assert_eq!(build_key("hello", "coucou", 16), "d0000000/hello/coucou"); + assert_eq!(build_key("hello/", "coucou", 17), "50000000/hello/coucou"); + assert_eq!(build_key("hello", "coucou", 17), "50000000/hello/coucou"); + assert_eq!(build_key("hello/", "coucou", 70), "f0000000/hello/coucou"); + } }