-
Notifications
You must be signed in to change notification settings - Fork 456
/
Copy pathmod.rs
225 lines (195 loc) · 7.12 KB
/
mod.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
//! Object storage backend abstraction layer for Delta Table transaction logs and data
use dashmap::DashMap;
use object_store::limit::LimitStore;
use std::collections::HashMap;
use std::sync::{Arc, OnceLock};
use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};
use url::Url;
pub mod file;
pub mod retry_ext;
pub mod utils;
use crate::{DeltaResult, DeltaTableError};
pub use object_store;
use object_store::local::LocalFileSystem;
use object_store::memory::InMemory;
pub use object_store::path::{Path, DELIMITER};
use object_store::prefix::PrefixStore;
pub use object_store::{
DynObjectStore, Error as ObjectStoreError, GetResult, ListResult, MultipartId, ObjectMeta,
ObjectStore, Result as ObjectStoreResult,
};
pub use retry_ext::ObjectStoreRetryExt;
pub use utils::*;
lazy_static! {
static ref DELTA_LOG_PATH: Path = Path::from("_delta_log");
}
/// Sharable reference to [`ObjectStore`]
pub type ObjectStoreRef = Arc<DynObjectStore>;
/// Factory trait for creating [ObjectStoreRef] instances at runtime
pub trait ObjectStoreFactory: Send + Sync {
#[allow(missing_docs)]
fn parse_url_opts(
&self,
url: &Url,
options: &StorageOptions,
) -> DeltaResult<(ObjectStoreRef, Path)>;
}
#[derive(Clone, Debug, Default)]
pub(crate) struct DefaultObjectStoreFactory {}
impl ObjectStoreFactory for DefaultObjectStoreFactory {
fn parse_url_opts(
&self,
url: &Url,
options: &StorageOptions,
) -> DeltaResult<(ObjectStoreRef, Path)> {
match url.scheme() {
"memory" => {
let path = Path::from_url_path(url.path())?;
let inner = Arc::new(InMemory::new()) as ObjectStoreRef;
let store = limit_store_handler(url_prefix_handler(inner, path.clone()), options);
Ok((store, path))
}
"file" => {
let inner = Arc::new(LocalFileSystem::new_with_prefix(
url.to_file_path().unwrap(),
)?) as ObjectStoreRef;
let store = limit_store_handler(inner, options);
Ok((store, Path::from("/")))
}
_ => Err(DeltaTableError::InvalidTableLocation(url.clone().into())),
}
}
}
/// TODO
pub type FactoryRegistry = Arc<DashMap<Url, Arc<dyn ObjectStoreFactory>>>;
/// TODO
pub fn factories() -> FactoryRegistry {
static REGISTRY: OnceLock<FactoryRegistry> = OnceLock::new();
REGISTRY
.get_or_init(|| {
let registry = FactoryRegistry::default();
registry.insert(
Url::parse("memory://").unwrap(),
Arc::new(DefaultObjectStoreFactory::default()),
);
registry.insert(
Url::parse("file://").unwrap(),
Arc::new(DefaultObjectStoreFactory::default()),
);
registry
})
.clone()
}
/// Simpler access pattern for the [FactoryRegistry] to get a single store
pub fn store_for(url: &Url) -> DeltaResult<ObjectStoreRef> {
let scheme = Url::parse(&format!("{}://", url.scheme())).unwrap();
if let Some(factory) = factories().get(&scheme) {
let (store, _prefix) = factory.parse_url_opts(url, &StorageOptions::default())?;
Ok(store)
} else {
Err(DeltaTableError::InvalidTableLocation(url.clone().into()))
}
}
/// Options used for configuring backend storage
#[derive(Clone, Debug, Serialize, Deserialize, Default)]
pub struct StorageOptions(pub HashMap<String, String>);
impl From<HashMap<String, String>> for StorageOptions {
fn from(value: HashMap<String, String>) -> Self {
Self(value)
}
}
/// Return the uri of commit version.
///
/// ```rust
/// # use deltalake_core::storage::*;
/// use object_store::path::Path;
/// let uri = commit_uri_from_version(1);
/// assert_eq!(uri, Path::from("_delta_log/00000000000000000001.json"));
/// ```
pub fn commit_uri_from_version(version: i64) -> Path {
let version = format!("{version:020}.json");
DELTA_LOG_PATH.child(version.as_str())
}
/// Return true for all the stringly values typically associated with true
///
/// aka YAML booleans
///
/// ```rust
/// # use deltalake_core::storage::*;
/// for value in ["1", "true", "on", "YES", "Y"] {
/// assert!(str_is_truthy(value));
/// }
/// for value in ["0", "FALSE", "off", "NO", "n", "bork"] {
/// assert!(!str_is_truthy(value));
/// }
/// ```
pub fn str_is_truthy(val: &str) -> bool {
val.eq_ignore_ascii_case("1")
| val.eq_ignore_ascii_case("true")
| val.eq_ignore_ascii_case("on")
| val.eq_ignore_ascii_case("yes")
| val.eq_ignore_ascii_case("y")
}
/// Simple function to wrap the given [ObjectStore] in a [PrefixStore] if necessary
///
/// This simplifies the use of the storage since it ensures that list/get/etc operations
/// start from the prefix in the object storage rather than from the root configured URI of the
/// [ObjectStore]
pub fn url_prefix_handler<T: ObjectStore>(store: T, prefix: Path) -> ObjectStoreRef {
if prefix != Path::from("/") {
Arc::new(PrefixStore::new(store, prefix))
} else {
Arc::new(store)
}
}
/// Simple function to wrap the given [ObjectStore] in a [LimitStore] if configured
///
/// Limits the number of concurrent connections the underlying object store
/// Reference [LimitStore](https://docs.rs/object_store/latest/object_store/limit/struct.LimitStore.html) for more information
pub fn limit_store_handler<T: ObjectStore>(store: T, options: &StorageOptions) -> ObjectStoreRef {
let concurrency_limit = options
.0
.get(storage_constants::OBJECT_STORE_CONCURRENCY_LIMIT)
.and_then(|v| v.parse().ok());
if let Some(limit) = concurrency_limit {
Arc::new(LimitStore::new(store, limit))
} else {
Arc::new(store)
}
}
/// Storage option keys to use when creating [ObjectStore].
/// The same key should be used whether passing a key in the hashmap or setting it as an environment variable.
/// Must be implemented for a given storage provider
pub mod storage_constants {
/// The number of concurrent connections the underlying object store can create
/// Reference [LimitStore](https://docs.rs/object_store/latest/object_store/limit/struct.LimitStore.html) for more information
pub const OBJECT_STORE_CONCURRENCY_LIMIT: &str = "OBJECT_STORE_CONCURRENCY_LIMIT";
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_url_prefix_handler() {
let store = InMemory::new();
let path = Path::parse("/databases/foo/bar").expect("Failed to parse path");
let prefixed = url_prefix_handler(store, path.clone());
assert_eq!(
String::from("PrefixObjectStore(databases/foo/bar)"),
format!("{prefixed}")
);
}
#[test]
fn test_limit_store_handler() {
let store = InMemory::new();
let options = StorageOptions(HashMap::from_iter(vec![(
"OBJECT_STORE_CONCURRENCY_LIMIT".into(),
"500".into(),
)]));
let limited = limit_store_handler(store, &options);
assert_eq!(
String::from("LimitStore(500, InMemory)"),
format!("{limited}")
);
}
}