-
Notifications
You must be signed in to change notification settings - Fork 350
/
storage_resolver.rs
249 lines (227 loc) · 9.15 KB
/
storage_resolver.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
// Copyright (C) 2024 Quickwit, Inc.
//
// Quickwit is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at [email protected].
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use once_cell::sync::Lazy;
use quickwit_common::uri::{Protocol, Uri};
use quickwit_config::{StorageBackend, StorageConfigs};
use crate::local_file_storage::LocalFileStorageFactory;
use crate::ram_storage::RamStorageFactory;
#[cfg(feature = "azure")]
use crate::AzureBlobStorageFactory;
#[cfg(feature = "gcs")]
use crate::GoogleCloudStorageFactory;
use crate::{S3CompatibleObjectStorageFactory, Storage, StorageFactory, StorageResolverError};
/// Returns the [`Storage`] instance associated with the protocol of a URI. The actual creation of
/// storage objects is delegated to pre-registered [`StorageFactory`]. The resolver is only
/// responsible for dispatching to the appropriate factory.
#[derive(Clone)]
pub struct StorageResolver {
per_backend_factories: Arc<HashMap<StorageBackend, Box<dyn StorageFactory>>>,
}
impl fmt::Debug for StorageResolver {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("StorageResolver").finish()
}
}
impl StorageResolver {
/// Creates an empty [`StorageResolverBuilder`].
pub fn builder() -> StorageResolverBuilder {
StorageResolverBuilder::default()
}
/// Resolves the given URI.
pub async fn resolve(&self, uri: &Uri) -> Result<Arc<dyn Storage>, StorageResolverError> {
let backend = match uri.protocol() {
Protocol::Azure => StorageBackend::Azure,
Protocol::File => StorageBackend::File,
Protocol::Ram => StorageBackend::Ram,
Protocol::S3 => StorageBackend::S3,
Protocol::Google => StorageBackend::Google,
_ => {
let message = format!(
"Quickwit does not support {} as a storage backend",
uri.protocol()
);
return Err(StorageResolverError::UnsupportedBackend(message));
}
};
let storage_factory = self.per_backend_factories.get(&backend).ok_or({
let message = format!("no storage factory is registered for {}", uri.protocol());
StorageResolverError::UnsupportedBackend(message)
})?;
let storage = storage_factory.resolve(uri).await?;
Ok(storage)
}
/// Creates and returns a default [`StorageResolver`] with the default storage configuration for
/// each backend. Note that if the environment (env vars, instance metadata, ...) fails to
/// provide the necessary credentials, the default Azure or S3 storage returned by this
/// resolver will not work.
pub fn unconfigured() -> Self {
static STORAGE_RESOLVER: Lazy<StorageResolver> = Lazy::new(|| {
let storage_configs = StorageConfigs::default();
StorageResolver::configured(&storage_configs)
});
STORAGE_RESOLVER.clone()
}
/// Creates and returns a [`StorageResolver`].
pub fn configured(storage_configs: &StorageConfigs) -> Self {
let mut builder = StorageResolver::builder()
.register(LocalFileStorageFactory)
.register(RamStorageFactory::default())
.register(S3CompatibleObjectStorageFactory::new(
storage_configs.find_s3().cloned().unwrap_or_default(),
));
#[cfg(feature = "azure")]
{
builder = builder.register(AzureBlobStorageFactory::new(
storage_configs.find_azure().cloned().unwrap_or_default(),
));
}
#[cfg(not(feature = "azure"))]
{
use crate::storage_factory::UnsupportedStorage;
builder = builder.register(UnsupportedStorage::new(
StorageBackend::Azure,
"Quickwit was compiled without the `azure` feature",
))
}
#[cfg(feature = "gcs")]
{
builder = builder.register(GoogleCloudStorageFactory::new(
storage_configs.find_google().cloned().unwrap_or_default(),
));
}
#[cfg(not(feature = "gcs"))]
{
use crate::storage_factory::UnsupportedStorage;
builder = builder.register(UnsupportedStorage::new(
StorageBackend::Google,
"Quickwit was compiled without the `gcs` feature",
))
}
builder
.build()
.expect("storage factory and config backends should match")
}
/// Returns a [`StorageResolver`] for testing purposes. Unlike
/// [`StorageResolver::unconfigured`], this resolver does not return a singleton.
#[cfg(any(test, feature = "testsuite"))]
pub fn for_test() -> Self {
StorageResolver::builder()
.register(RamStorageFactory::default())
.register(LocalFileStorageFactory)
.build()
.expect("storage factory and config backends should match")
}
}
#[derive(Default)]
pub struct StorageResolverBuilder {
per_backend_factories: HashMap<StorageBackend, Box<dyn StorageFactory>>,
}
impl StorageResolverBuilder {
/// Registers a [`StorageFactory`].
pub fn register<S: StorageFactory>(mut self, storage_factory: S) -> Self {
self.per_backend_factories
.insert(storage_factory.backend(), Box::new(storage_factory));
self
}
/// Builds the [`StorageResolver`].
pub fn build(self) -> anyhow::Result<StorageResolver> {
let storage_resolver = StorageResolver {
per_backend_factories: Arc::new(self.per_backend_factories),
};
Ok(storage_resolver)
}
}
#[cfg(test)]
mod tests {
use std::path::Path;
use super::*;
use crate::{MockStorageFactory, RamStorage};
#[tokio::test]
async fn test_storage_resolver_simple() -> anyhow::Result<()> {
let mut file_storage_factory = MockStorageFactory::new();
file_storage_factory
.expect_backend()
.returning(|| StorageBackend::File);
let mut ram_storage_factory = MockStorageFactory::new();
ram_storage_factory
.expect_backend()
.returning(|| StorageBackend::Ram);
ram_storage_factory.expect_resolve().returning(|_uri| {
Ok(Arc::new(
RamStorage::builder()
.put("hello", b"hello_content_second")
.build(),
))
});
let storage_resolver = StorageResolver::builder()
.register(file_storage_factory)
.register(ram_storage_factory)
.build()
.unwrap();
let storage = storage_resolver.resolve(&Uri::for_test("ram:///")).await?;
let data = storage.get_all(Path::new("hello")).await?;
assert_eq!(&data[..], b"hello_content_second");
Ok(())
}
#[tokio::test]
async fn test_storage_resolver_override() -> anyhow::Result<()> {
let mut first_ram_storage_factory = MockStorageFactory::new();
first_ram_storage_factory
.expect_backend()
.returning(|| StorageBackend::Ram);
let mut second_ram_storage_factory = MockStorageFactory::new();
second_ram_storage_factory
.expect_backend()
.returning(|| StorageBackend::Ram);
second_ram_storage_factory
.expect_resolve()
.returning(|uri| {
assert_eq!(uri.as_str(), "ram:///home");
Ok(Arc::new(
RamStorage::builder()
.put("hello", b"hello_content_second")
.build(),
))
});
let storage_resolver = StorageResolver::builder()
.register(first_ram_storage_factory)
.register(second_ram_storage_factory)
.build()
.unwrap();
let storage = storage_resolver
.resolve(&Uri::for_test("ram:///home"))
.await?;
let data = storage.get_all(Path::new("hello")).await?;
assert_eq!(&data[..], b"hello_content_second");
Ok(())
}
#[tokio::test]
async fn test_storage_resolver_unsupported_protocol() {
let storage_resolver = StorageResolver::unconfigured();
let storage_uri = Uri::for_test("postgresql://localhost:5432/metastore");
let resolver_error = storage_resolver.resolve(&storage_uri).await.unwrap_err();
assert!(matches!(
resolver_error,
StorageResolverError::UnsupportedBackend(_)
));
}
}