From c1aab14afee35d9638129d6517a45302c3d92d2a Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Thu, 9 Jan 2025 16:46:11 -0600 Subject: [PATCH 1/7] u64 ranges --- object_store/src/lib.rs | 12 ++++++------ object_store/src/limit.rs | 4 ++-- object_store/src/memory.rs | 23 +++++++++++++---------- object_store/src/prefix.rs | 4 ++-- object_store/src/throttle.rs | 6 +++--- object_store/src/util.rs | 33 +++++++++++++++++---------------- 6 files changed, 43 insertions(+), 39 deletions(-) diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 53eda5a82fd..be337061d62 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -630,7 +630,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static { /// in the given byte range. /// /// See [`GetRange::Bounded`] for more details on how `range` gets interpreted - async fn get_range(&self, location: &Path, range: Range) -> Result { + async fn get_range(&self, location: &Path, range: Range) -> Result { let options = GetOptions { range: Some(range.into()), ..Default::default() @@ -640,7 +640,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static { /// Return the bytes that are stored at the specified location /// in the given byte ranges - async fn get_ranges(&self, location: &Path, ranges: &[Range]) -> Result> { + async fn get_ranges(&self, location: &Path, ranges: &[Range]) -> Result> { coalesce_ranges( ranges, |range| self.get_range(location, range), @@ -820,14 +820,14 @@ macro_rules! as_ref_impl { self.as_ref().get_opts(location, options).await } - async fn get_range(&self, location: &Path, range: Range) -> Result { + async fn get_range(&self, location: &Path, range: Range) -> Result { self.as_ref().get_range(location, range).await } async fn get_ranges( &self, location: &Path, - ranges: &[Range], + ranges: &[Range], ) -> Result> { self.as_ref().get_ranges(location, ranges).await } @@ -904,7 +904,7 @@ pub struct ObjectMeta { /// The last modified time pub last_modified: DateTime, /// The size in bytes of the object - pub size: usize, + pub size: u64, /// The unique identifier for the object /// /// @@ -1019,7 +1019,7 @@ pub struct GetResult { /// The [`ObjectMeta`] for this object pub meta: ObjectMeta, /// The range of bytes returned by this request - pub range: Range, + pub range: Range, /// Additional object attributes pub attributes: Attributes, } diff --git a/object_store/src/limit.rs b/object_store/src/limit.rs index 77f72a0e11a..330a0da07f7 100644 --- a/object_store/src/limit.rs +++ b/object_store/src/limit.rs @@ -117,12 +117,12 @@ impl ObjectStore for LimitStore { Ok(permit_get_result(r, permit)) } - async fn get_range(&self, location: &Path, range: Range) -> Result { + async fn get_range(&self, location: &Path, range: Range) -> Result { let _permit = self.semaphore.acquire().await.unwrap(); self.inner.get_range(location, range).await } - async fn get_ranges(&self, location: &Path, ranges: &[Range]) -> Result> { + async fn get_ranges(&self, location: &Path, ranges: &[Range]) -> Result> { let _permit = self.semaphore.acquire().await.unwrap(); self.inner.get_ranges(location, ranges).await } diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs index 6402f924346..040c87d3951 100644 --- a/object_store/src/memory.rs +++ b/object_store/src/memory.rs @@ -241,7 +241,7 @@ impl ObjectStore for InMemory { let meta = ObjectMeta { location: location.clone(), last_modified: entry.last_modified, - size: entry.data.len(), + size: entry.data.len() as u64, e_tag: Some(e_tag), version: None, }; @@ -250,11 +250,14 @@ impl ObjectStore for InMemory { let (range, data) = match options.range { Some(range) => { let r = range - .as_range(entry.data.len()) + .as_range(entry.data.len() as u64) .map_err(|source| Error::Range { source })?; - (r.clone(), entry.data.slice(r)) + ( + r.clone(), + entry.data.slice(r.start as usize..r.end as usize), + ) } - None => (0..entry.data.len(), entry.data), + None => (0..entry.data.len() as u64, entry.data), }; let stream = futures::stream::once(futures::future::ready(Ok(data))); @@ -266,15 +269,15 @@ impl ObjectStore for InMemory { }) } - async fn get_ranges(&self, location: &Path, ranges: &[Range]) -> Result> { + async fn get_ranges(&self, location: &Path, ranges: &[Range]) -> Result> { let entry = self.entry(location).await?; ranges .iter() .map(|range| { let r = GetRange::Bounded(range.clone()) - .as_range(entry.data.len()) + .as_range(entry.data.len() as u64) .map_err(|source| Error::Range { source })?; - + let r = r.start as usize..r.end as usize; Ok(entry.data.slice(r)) }) .collect() @@ -286,7 +289,7 @@ impl ObjectStore for InMemory { Ok(ObjectMeta { location: location.clone(), last_modified: entry.last_modified, - size: entry.data.len(), + size: entry.data.len() as u64, e_tag: Some(entry.e_tag.to_string()), version: None, }) @@ -316,7 +319,7 @@ impl ObjectStore for InMemory { Ok(ObjectMeta { location: key.clone(), last_modified: value.last_modified, - size: value.data.len(), + size: value.data.len() as u64, e_tag: Some(value.e_tag.to_string()), version: None, }) @@ -361,7 +364,7 @@ impl ObjectStore for InMemory { let object = ObjectMeta { location: k.clone(), last_modified: v.last_modified, - size: v.data.len(), + size: v.data.len() as u64, e_tag: Some(v.e_tag.to_string()), version: None, }; diff --git a/object_store/src/prefix.rs b/object_store/src/prefix.rs index a0b67ca4b58..ac9803ec0d9 100644 --- a/object_store/src/prefix.rs +++ b/object_store/src/prefix.rs @@ -132,7 +132,7 @@ impl ObjectStore for PrefixStore { self.inner.get(&full_path).await } - async fn get_range(&self, location: &Path, range: Range) -> Result { + async fn get_range(&self, location: &Path, range: Range) -> Result { let full_path = self.full_path(location); self.inner.get_range(&full_path, range).await } @@ -142,7 +142,7 @@ impl ObjectStore for PrefixStore { self.inner.get_opts(&full_path, options).await } - async fn get_ranges(&self, location: &Path, ranges: &[Range]) -> Result> { + async fn get_ranges(&self, location: &Path, ranges: &[Range]) -> Result> { let full_path = self.full_path(location); self.inner.get_ranges(&full_path, ranges).await } diff --git a/object_store/src/throttle.rs b/object_store/src/throttle.rs index 29cd32705cc..6586ba94684 100644 --- a/object_store/src/throttle.rs +++ b/object_store/src/throttle.rs @@ -203,7 +203,7 @@ impl ObjectStore for ThrottledStore { Ok(throttle_get(result, wait_get_per_byte)) } - async fn get_range(&self, location: &Path, range: Range) -> Result { + async fn get_range(&self, location: &Path, range: Range) -> Result { let config = self.config(); let sleep_duration = @@ -214,10 +214,10 @@ impl ObjectStore for ThrottledStore { self.inner.get_range(location, range).await } - async fn get_ranges(&self, location: &Path, ranges: &[Range]) -> Result> { + async fn get_ranges(&self, location: &Path, ranges: &[Range]) -> Result> { let config = self.config(); - let total_bytes: usize = ranges.iter().map(|range| range.end - range.start).sum(); + let total_bytes: u64 = ranges.iter().map(|range| range.end - range.start).sum(); let sleep_duration = config.wait_get_per_call + config.wait_get_per_byte * total_bytes as u32; diff --git a/object_store/src/util.rs b/object_store/src/util.rs index 6d638f3cb2b..d823bb77491 100644 --- a/object_store/src/util.rs +++ b/object_store/src/util.rs @@ -49,7 +49,7 @@ pub(crate) fn hmac_sha256(secret: impl AsRef<[u8]>, bytes: impl AsRef<[u8]>) -> } /// Collect a stream into [`Bytes`] avoiding copying in the event of a single chunk -pub async fn collect_bytes(mut stream: S, size_hint: Option) -> Result +pub async fn collect_bytes(mut stream: S, size_hint: Option) -> Result where E: Send, S: Stream> + Send + Unpin, @@ -60,9 +60,9 @@ where match stream.next().await.transpose()? { None => Ok(first), Some(second) => { - let size_hint = size_hint.unwrap_or_else(|| first.len() + second.len()); + let size_hint = size_hint.unwrap_or_else(|| first.len() as u64 + second.len() as u64); - let mut buf = Vec::with_capacity(size_hint); + let mut buf = Vec::with_capacity(size_hint as usize); buf.extend_from_slice(&first); buf.extend_from_slice(&second); while let Some(maybe_bytes) = stream.next().await { @@ -89,7 +89,7 @@ where /// Range requests with a gap less than or equal to this, /// will be coalesced into a single request by [`coalesce_ranges`] -pub const OBJECT_STORE_COALESCE_DEFAULT: usize = 1024 * 1024; +pub const OBJECT_STORE_COALESCE_DEFAULT: u64 = 1024 * 1024; /// Up to this number of range requests will be performed in parallel by [`coalesce_ranges`] pub(crate) const OBJECT_STORE_COALESCE_PARALLEL: usize = 10; @@ -103,12 +103,12 @@ pub(crate) const OBJECT_STORE_COALESCE_PARALLEL: usize = 10; /// * Make multiple `fetch` requests in parallel (up to maximum of 10) /// pub async fn coalesce_ranges( - ranges: &[Range], + ranges: &[Range], fetch: F, - coalesce: usize, + coalesce: u64, ) -> Result, E> where - F: Send + FnMut(Range) -> Fut, + F: Send + FnMut(Range) -> Fut, E: Send, Fut: std::future::Future> + Send, { @@ -129,13 +129,14 @@ where let start = range.start - fetch_range.start; let end = range.end - fetch_range.start; - fetch_bytes.slice(start..end.min(fetch_bytes.len())) + let range = (start as usize)..(end as usize).min(fetch_bytes.len()); + fetch_bytes.slice(range) }) .collect()) } /// Returns a sorted list of ranges that cover `ranges` -fn merge_ranges(ranges: &[Range], coalesce: usize) -> Vec> { +fn merge_ranges(ranges: &[Range], coalesce: u64) -> Vec> { if ranges.is_empty() { return vec![]; } @@ -196,20 +197,20 @@ pub enum GetRange { /// an error will be returned. Additionally, if the range ends after the end /// of the object, the entire remainder of the object will be returned. /// Otherwise, the exact requested range will be returned. - Bounded(Range), + Bounded(Range), /// Request all bytes starting from a given byte offset - Offset(usize), + Offset(u64), /// Request up to the last n bytes - Suffix(usize), + Suffix(u64), } #[derive(Debug, thiserror::Error)] pub(crate) enum InvalidGetRange { #[error("Wanted range starting at {requested}, but object was only {length} bytes long")] - StartTooLarge { requested: usize, length: usize }, + StartTooLarge { requested: u64, length: u64 }, #[error("Range started at {start} and ended at {end}")] - Inconsistent { start: usize, end: usize }, + Inconsistent { start: u64, end: u64 }, } impl GetRange { @@ -227,7 +228,7 @@ impl GetRange { } /// Convert to a [`Range`] if valid. - pub(crate) fn as_range(&self, len: usize) -> Result, InvalidGetRange> { + pub(crate) fn as_range(&self, len: u64) -> Result, InvalidGetRange> { self.is_valid()?; match self { Self::Bounded(r) => { @@ -267,7 +268,7 @@ impl Display for GetRange { } } -impl> From for GetRange { +impl> From for GetRange { fn from(value: T) -> Self { use std::ops::Bound::*; let first = match value.start_bound() { From b1148021f1981e6b30e7719fec74a84e9e10484a Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Fri, 10 Jan 2025 09:02:58 -0600 Subject: [PATCH 2/7] more u64 --- object_store/src/chunked.rs | 12 +++--- object_store/src/integration.rs | 10 ++--- object_store/src/lib.rs | 6 +-- object_store/src/local.rs | 55 +++++++++++----------------- object_store/src/util.rs | 11 ++++-- object_store/tests/get_range_file.rs | 13 ++++--- 6 files changed, 51 insertions(+), 56 deletions(-) diff --git a/object_store/src/chunked.rs b/object_store/src/chunked.rs index 4998e9f2a04..30fcf730226 100644 --- a/object_store/src/chunked.rs +++ b/object_store/src/chunked.rs @@ -44,12 +44,12 @@ use crate::{PutPayload, Result}; #[derive(Debug)] pub struct ChunkedStore { inner: Arc, - chunk_size: usize, + chunk_size: u64, } impl ChunkedStore { /// Creates a new [`ChunkedStore`] with the specified chunk_size - pub fn new(inner: Arc, chunk_size: usize) -> Self { + pub fn new(inner: Arc, chunk_size: u64) -> Self { Self { inner, chunk_size } } } @@ -100,7 +100,7 @@ impl ObjectStore for ChunkedStore { if exhausted { return None; } - while buffer.len() < chunk_size { + while buffer.len() < chunk_size as usize { match stream.next().await { None => { exhausted = true; @@ -125,7 +125,7 @@ impl ObjectStore for ChunkedStore { }; } // Return the chunked values as the next value in the stream - let slice = buffer.split_to(chunk_size).freeze(); + let slice = buffer.split_to(chunk_size as usize).freeze(); Some((Ok(slice), (stream, buffer, exhausted, chunk_size))) }, ) @@ -138,7 +138,7 @@ impl ObjectStore for ChunkedStore { }) } - async fn get_range(&self, location: &Path, range: Range) -> Result { + async fn get_range(&self, location: &Path, range: Range) -> Result { self.inner.get_range(location, range).await } @@ -203,7 +203,7 @@ mod tests { let mut remaining = 1001; while let Some(next) = s.next().await { - let size = next.unwrap().len(); + let size = next.unwrap().len() as u64; let expected = remaining.min(chunk_size); assert_eq!(size, expected); remaining -= expected; diff --git a/object_store/src/integration.rs b/object_store/src/integration.rs index 20e95fddc47..25a929459ef 100644 --- a/object_store/src/integration.rs +++ b/object_store/src/integration.rs @@ -112,7 +112,7 @@ pub async fn put_get_delete_list(storage: &DynObjectStore) { let range_result = storage.get_range(&location, range.clone()).await; let bytes = range_result.unwrap(); - assert_eq!(bytes, data.slice(range.clone())); + assert_eq!(bytes, data.slice(range.start as usize..range.end as usize)); let opts = GetOptions { range: Some(GetRange::Bounded(2..5)), @@ -190,11 +190,11 @@ pub async fn put_get_delete_list(storage: &DynObjectStore) { let ranges = vec![0..1, 2..3, 0..5]; let bytes = storage.get_ranges(&location, &ranges).await.unwrap(); for (range, bytes) in ranges.iter().zip(bytes) { - assert_eq!(bytes, data.slice(range.clone())) + assert_eq!(bytes, data.slice(range.start as usize..range.end as usize)); } let head = storage.head(&location).await.unwrap(); - assert_eq!(head.size, data.len()); + assert_eq!(head.size, data.len() as u64); storage.delete(&location).await.unwrap(); @@ -934,7 +934,7 @@ pub async fn list_with_delimiter(storage: &DynObjectStore) { let object = &result.objects[0]; assert_eq!(object.location, expected_location); - assert_eq!(object.size, data.len()); + assert_eq!(object.size, data.len() as u64); // ==================== check: prefix-list `mydb/wb/000/000/001` (partial filename doesn't match) ==================== let prefix = Path::from("mydb/wb/000/000/001"); @@ -1085,7 +1085,7 @@ pub async fn multipart(storage: &dyn ObjectStore, multipart: &dyn MultipartStore .unwrap(); let meta = storage.head(&path).await.unwrap(); - assert_eq!(meta.size, chunk_size * 2); + assert_eq!(meta.size, chunk_size as u64 * 2); // Empty case let path = Path::from("test_empty_multipart"); diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index be337061d62..d56668b2bbf 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -234,7 +234,7 @@ //! //! // Buffer the entire object in memory //! let object: Bytes = result.bytes().await.unwrap(); -//! assert_eq!(object.len(), meta.size); +//! assert_eq!(object.len() as u64, meta.size); //! //! // Alternatively stream the bytes from object storage //! let stream = object_store.get(&path).await.unwrap().into_stream(); @@ -1060,7 +1060,7 @@ impl GetResult { path: path.clone(), })?; - let mut buffer = Vec::with_capacity(len); + let mut buffer = Vec::with_capacity(len as usize); file.take(len as _) .read_to_end(&mut buffer) .map_err(|source| local::Error::UnableToReadBytes { source, path })?; @@ -1093,7 +1093,7 @@ impl GetResult { match self.payload { #[cfg(all(feature = "fs", not(target_arch = "wasm32")))] GetResultPayload::File(file, path) => { - const CHUNK_SIZE: usize = 8 * 1024; + const CHUNK_SIZE: u64 = 8 * 1024; local::chunked_stream(file, path, self.range, CHUNK_SIZE) } GetResultPayload::Stream(s) => s, diff --git a/object_store/src/local.rs b/object_store/src/local.rs index 364026459a0..d57ccd66c81 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -21,7 +21,7 @@ use std::io::{ErrorKind, Read, Seek, SeekFrom, Write}; use std::ops::Range; use std::sync::Arc; use std::time::SystemTime; -use std::{collections::BTreeSet, convert::TryFrom, io}; +use std::{collections::BTreeSet, io}; use std::{collections::VecDeque, path::PathBuf}; use async_trait::async_trait; @@ -44,12 +44,6 @@ use crate::{ /// A specialized `Error` for filesystem object store-related errors #[derive(Debug, thiserror::Error)] pub(crate) enum Error { - #[error("File size for {} did not fit in a usize: {}", path, source)] - FileSizeOverflowedUsize { - source: std::num::TryFromIntError, - path: String, - }, - #[error("Unable to walk dir: {}", source)] UnableToWalkDir { source: walkdir::Error }, @@ -83,8 +77,8 @@ pub(crate) enum Error { #[error("Out of range of file {}, expected: {}, actual: {}", path.display(), expected, actual)] OutOfRange { path: PathBuf, - expected: usize, - actual: usize, + expected: u64, + actual: u64, }, #[error("Requested range was invalid")] @@ -410,7 +404,7 @@ impl ObjectStore for LocalFileSystem { let path = self.path_to_filesystem(&location)?; maybe_spawn_blocking(move || { let (file, metadata) = open_file(&path)?; - let meta = convert_metadata(metadata, location)?; + let meta = convert_metadata(metadata, location); options.check_preconditions(&meta)?; let range = match options.range { @@ -430,7 +424,7 @@ impl ObjectStore for LocalFileSystem { .await } - async fn get_range(&self, location: &Path, range: Range) -> Result { + async fn get_range(&self, location: &Path, range: Range) -> Result { let path = self.path_to_filesystem(location)?; maybe_spawn_blocking(move || { let (mut file, _) = open_file(&path)?; @@ -439,7 +433,7 @@ impl ObjectStore for LocalFileSystem { .await } - async fn get_ranges(&self, location: &Path, ranges: &[Range]) -> Result> { + async fn get_ranges(&self, location: &Path, ranges: &[Range]) -> Result> { let path = self.path_to_filesystem(location)?; let ranges = ranges.to_vec(); maybe_spawn_blocking(move || { @@ -825,8 +819,8 @@ impl Drop for LocalUpload { pub(crate) fn chunked_stream( mut file: File, path: PathBuf, - range: Range, - chunk_size: usize, + range: Range, + chunk_size: u64, ) -> BoxStream<'static, Result> { futures::stream::once(async move { let (file, path) = maybe_spawn_blocking(move || { @@ -848,7 +842,7 @@ pub(crate) fn chunked_stream( } let to_read = remaining.min(chunk_size); - let mut buffer = Vec::with_capacity(to_read); + let mut buffer = Vec::with_capacity(to_read as usize); let read = (&mut file) .take(to_read as u64) .read_to_end(&mut buffer) @@ -857,7 +851,7 @@ pub(crate) fn chunked_stream( path: path.clone(), })?; - Ok(Some((buffer.into(), (file, path, remaining - read)))) + Ok(Some((buffer.into(), (file, path, remaining - read as u64)))) }) }, ); @@ -867,7 +861,7 @@ pub(crate) fn chunked_stream( .boxed() } -pub(crate) fn read_range(file: &mut File, path: &PathBuf, range: Range) -> Result { +pub(crate) fn read_range(file: &mut File, path: &PathBuf, range: Range) -> Result { let to_read = range.end - range.start; file.seek(SeekFrom::Start(range.start as u64)) .map_err(|source| { @@ -875,14 +869,11 @@ pub(crate) fn read_range(file: &mut File, path: &PathBuf, range: Range) - Error::Seek { source, path } })?; - let mut buf = Vec::with_capacity(to_read); - let read = file - .take(to_read as u64) - .read_to_end(&mut buf) - .map_err(|source| { - let path = path.into(); - Error::UnableToReadBytes { source, path } - })?; + let mut buf = Vec::with_capacity(to_read as usize); + let read = file.take(to_read).read_to_end(&mut buf).map_err(|source| { + let path = path.into(); + Error::UnableToReadBytes { source, path } + })? as u64; if read != to_read { let error = Error::OutOfRange { @@ -922,7 +913,7 @@ fn open_file(path: &PathBuf) -> Result<(File, Metadata)> { fn convert_entry(entry: DirEntry, location: Path) -> Result> { match entry.metadata() { - Ok(metadata) => convert_metadata(metadata, location).map(Some), + Ok(metadata) => Ok(Some(convert_metadata(metadata, location))), Err(e) => { if let Some(io_err) = e.io_error() { if io_err.kind() == ErrorKind::NotFound { @@ -960,20 +951,16 @@ fn get_etag(metadata: &Metadata) -> String { format!("{inode:x}-{mtime:x}-{size:x}") } -fn convert_metadata(metadata: Metadata, location: Path) -> Result { +fn convert_metadata(metadata: Metadata, location: Path) -> ObjectMeta { let last_modified = last_modified(&metadata); - let size = usize::try_from(metadata.len()).map_err(|source| { - let path = location.as_ref().into(); - Error::FileSizeOverflowedUsize { source, path } - })?; - Ok(ObjectMeta { + ObjectMeta { location, last_modified, - size, + size: metadata.len(), e_tag: Some(get_etag(&metadata)), version: None, - }) + } } #[cfg(unix)] diff --git a/object_store/src/util.rs b/object_store/src/util.rs index d823bb77491..6671e54c089 100644 --- a/object_store/src/util.rs +++ b/object_store/src/util.rs @@ -324,7 +324,7 @@ mod tests { /// Calls coalesce_ranges and validates the returned data is correct /// /// Returns the fetched ranges - async fn do_fetch(ranges: Vec>, coalesce: usize) -> Vec> { + async fn do_fetch(ranges: Vec>, coalesce: u64) -> Vec> { let max = ranges.iter().map(|x| x.end).max().unwrap_or(0); let src: Vec<_> = (0..max).map(|x| x as u8).collect(); @@ -333,7 +333,9 @@ mod tests { &ranges, |range| { fetches.push(range.clone()); - futures::future::ready(Ok(Bytes::from(src[range].to_vec()))) + futures::future::ready(Ok(Bytes::from( + src[range.start as usize..range.end as usize].to_vec(), + ))) }, coalesce, ) @@ -342,7 +344,10 @@ mod tests { assert_eq!(ranges.len(), coalesced.len()); for (range, bytes) in ranges.iter().zip(coalesced) { - assert_eq!(bytes.as_ref(), &src[range.clone()]); + assert_eq!( + bytes.as_ref(), + &src[range.start as usize..range.end as usize] + ); } fetches } diff --git a/object_store/tests/get_range_file.rs b/object_store/tests/get_range_file.rs index e500fc8ac87..6790c11e71c 100644 --- a/object_store/tests/get_range_file.rs +++ b/object_store/tests/get_range_file.rs @@ -90,12 +90,15 @@ async fn test_get_range() { let fetched = store.get(&path).await.unwrap().bytes().await.unwrap(); assert_eq!(expected, fetched); - for range in [0..10, 3..5, 0..expected.len()] { + for range in [0..10, 3..5, 0..expected.len() as u64] { let data = store.get_range(&path, range.clone()).await.unwrap(); - assert_eq!(&data[..], &expected[range]) + assert_eq!( + &data[..], + &expected[range.start as usize..range.end as usize] + ) } - let over_range = 0..(expected.len() * 2); + let over_range = 0..(expected.len() as u64 * 2); let data = store.get_range(&path, over_range.clone()).await.unwrap(); assert_eq!(&data[..], expected) } @@ -113,10 +116,10 @@ async fn test_get_opts_over_range() { store.put(&path, expected.clone().into()).await.unwrap(); let opts = GetOptions { - range: Some(GetRange::Bounded(0..(expected.len() * 2))), + range: Some(GetRange::Bounded(0..(expected.len() as u64 * 2))), ..Default::default() }; let res = store.get_opts(&path, opts).await.unwrap(); - assert_eq!(res.range, 0..expected.len()); + assert_eq!(res.range, 0..expected.len() as u64); assert_eq!(res.bytes().await.unwrap(), expected); } From 450a2685b0bd38092262552a330c17dd81d7182e Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Fri, 10 Jan 2025 09:43:18 -0600 Subject: [PATCH 3/7] make clippy happy --- object_store/src/local.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/object_store/src/local.rs b/object_store/src/local.rs index d57ccd66c81..dd1e5478e4b 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -844,7 +844,7 @@ pub(crate) fn chunked_stream( let to_read = remaining.min(chunk_size); let mut buffer = Vec::with_capacity(to_read as usize); let read = (&mut file) - .take(to_read as u64) + .take(to_read) .read_to_end(&mut buffer) .map_err(|e| Error::UnableToReadBytes { source: e, @@ -863,11 +863,10 @@ pub(crate) fn chunked_stream( pub(crate) fn read_range(file: &mut File, path: &PathBuf, range: Range) -> Result { let to_read = range.end - range.start; - file.seek(SeekFrom::Start(range.start as u64)) - .map_err(|source| { - let path = path.into(); - Error::Seek { source, path } - })?; + file.seek(SeekFrom::Start(range.start)).map_err(|source| { + let path = path.into(); + Error::Seek { source, path } + })?; let mut buf = Vec::with_capacity(to_read as usize); let read = file.take(to_read).read_to_end(&mut buf).map_err(|source| { From b6670cda62f8bbde79fecf4fe5de8079178276fe Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Fri, 10 Jan 2025 10:50:25 -0600 Subject: [PATCH 4/7] even more u64 --- object_store/src/azure/client.rs | 2 +- object_store/src/client/get.rs | 10 +++++----- object_store/src/client/s3.rs | 2 +- object_store/src/http/client.rs | 4 ++-- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index fa5412c455f..2c2e27ea417 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -1058,7 +1058,7 @@ impl TryFrom for ObjectMeta { Ok(Self { location: Path::parse(value.name)?, last_modified: value.properties.last_modified, - size: value.properties.content_length as usize, + size: value.properties.content_length, e_tag: value.properties.e_tag, version: None, // For consistency with S3 and GCP which don't include this }) diff --git a/object_store/src/client/get.rs b/object_store/src/client/get.rs index 57aca895645..f252dd9c2a2 100644 --- a/object_store/src/client/get.rs +++ b/object_store/src/client/get.rs @@ -67,9 +67,9 @@ impl GetClientExt for T { struct ContentRange { /// The range of the object returned - range: Range, + range: Range, /// The total size of the object being requested - size: usize, + size: u64, } impl ContentRange { @@ -84,7 +84,7 @@ impl ContentRange { let (start_s, end_s) = range.split_once('-')?; let start = start_s.parse().ok()?; - let end: usize = end_s.parse().ok()?; + let end: u64 = end_s.parse().ok()?; Some(Self { size, @@ -140,8 +140,8 @@ enum GetResultError { #[error("Requested {expected:?}, got {actual:?}")] UnexpectedRange { - expected: Range, - actual: Range, + expected: Range, + actual: Range, }, } diff --git a/object_store/src/client/s3.rs b/object_store/src/client/s3.rs index 7fe956b2376..a2221fbbc10 100644 --- a/object_store/src/client/s3.rs +++ b/object_store/src/client/s3.rs @@ -66,7 +66,7 @@ pub struct ListPrefix { #[serde(rename_all = "PascalCase")] pub struct ListContents { pub key: String, - pub size: usize, + pub size: u64, pub last_modified: DateTime, #[serde(rename = "ETag")] pub e_tag: Option, diff --git a/object_store/src/http/client.rs b/object_store/src/http/client.rs index 41e6464c199..9983fdff5cd 100644 --- a/object_store/src/http/client.rs +++ b/object_store/src/http/client.rs @@ -420,7 +420,7 @@ impl MultiStatusResponse { })?) } - fn size(&self) -> Result { + fn size(&self) -> Result { let size = self .prop_stat .prop @@ -462,7 +462,7 @@ pub(crate) struct Prop { last_modified: DateTime, #[serde(rename = "getcontentlength")] - content_length: Option, + content_length: Option, #[serde(rename = "resourcetype")] resource_type: ResourceType, From af30a4cb862f72ddf9bf166cb880085b110e0dac Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Wed, 15 Jan 2025 16:49:50 -0500 Subject: [PATCH 5/7] Update object_store/src/lib.rs Co-authored-by: Andrew Lamb --- object_store/src/lib.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index d56668b2bbf..680d986b014 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -903,7 +903,9 @@ pub struct ObjectMeta { pub location: Path, /// The last modified time pub last_modified: DateTime, - /// The size in bytes of the object + /// The size in bytes of the object. + /// + /// Note this is not `usize` as `object_store` supports 32-bit architectures such as WASM pub size: u64, /// The unique identifier for the object /// From 5608f0159e78ebb309dd50ec7652a82d64900c76 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Wed, 15 Jan 2025 16:50:02 -0500 Subject: [PATCH 6/7] Update object_store/src/lib.rs Co-authored-by: Andrew Lamb --- object_store/src/lib.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 680d986b014..92a27486492 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -1021,6 +1021,8 @@ pub struct GetResult { /// The [`ObjectMeta`] for this object pub meta: ObjectMeta, /// The range of bytes returned by this request + /// + /// Note this is not `usize` as `object_store` supports 32-bit architectures such as WASM pub range: Range, /// Additional object attributes pub attributes: Attributes, From 077e99b346b9c46cdc7bb0b9b8cd281835a9ce55 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Wed, 15 Jan 2025 20:49:53 -0600 Subject: [PATCH 7/7] address comments --- object_store/src/chunked.rs | 10 +++++----- object_store/src/lib.rs | 10 +++++++--- object_store/src/local.rs | 12 +++++++++--- object_store/src/memory.rs | 15 +++++++++++++-- object_store/src/util.rs | 27 +++++++++++++++++++-------- 5 files changed, 53 insertions(+), 21 deletions(-) diff --git a/object_store/src/chunked.rs b/object_store/src/chunked.rs index 30fcf730226..2bb30b905c1 100644 --- a/object_store/src/chunked.rs +++ b/object_store/src/chunked.rs @@ -44,12 +44,12 @@ use crate::{PutPayload, Result}; #[derive(Debug)] pub struct ChunkedStore { inner: Arc, - chunk_size: u64, + chunk_size: usize, // chunks are in memory, so we use usize not u64 } impl ChunkedStore { /// Creates a new [`ChunkedStore`] with the specified chunk_size - pub fn new(inner: Arc, chunk_size: u64) -> Self { + pub fn new(inner: Arc, chunk_size: usize) -> Self { Self { inner, chunk_size } } } @@ -100,7 +100,7 @@ impl ObjectStore for ChunkedStore { if exhausted { return None; } - while buffer.len() < chunk_size as usize { + while buffer.len() < chunk_size { match stream.next().await { None => { exhausted = true; @@ -125,7 +125,7 @@ impl ObjectStore for ChunkedStore { }; } // Return the chunked values as the next value in the stream - let slice = buffer.split_to(chunk_size as usize).freeze(); + let slice = buffer.split_to(chunk_size).freeze(); Some((Ok(slice), (stream, buffer, exhausted, chunk_size))) }, ) @@ -204,7 +204,7 @@ mod tests { let mut remaining = 1001; while let Some(next) = s.next().await { let size = next.unwrap().len() as u64; - let expected = remaining.min(chunk_size); + let expected = remaining.min(chunk_size as u64); assert_eq!(size, expected); remaining -= expected; } diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 92a27486492..cffcbbdd435 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -903,7 +903,7 @@ pub struct ObjectMeta { pub location: Path, /// The last modified time pub last_modified: DateTime, - /// The size in bytes of the object. + /// The size in bytes of the object. /// /// Note this is not `usize` as `object_store` supports 32-bit architectures such as WASM pub size: u64, @@ -1064,7 +1064,11 @@ impl GetResult { path: path.clone(), })?; - let mut buffer = Vec::with_capacity(len as usize); + let mut buffer = if let Ok(len) = len.try_into() { + Vec::with_capacity(len) + } else { + Vec::new() + }; file.take(len as _) .read_to_end(&mut buffer) .map_err(|source| local::Error::UnableToReadBytes { source, path })?; @@ -1097,7 +1101,7 @@ impl GetResult { match self.payload { #[cfg(all(feature = "fs", not(target_arch = "wasm32")))] GetResultPayload::File(file, path) => { - const CHUNK_SIZE: u64 = 8 * 1024; + const CHUNK_SIZE: usize = 8 * 1024; local::chunked_stream(file, path, self.range, CHUNK_SIZE) } GetResultPayload::Stream(s) => s, diff --git a/object_store/src/local.rs b/object_store/src/local.rs index dd1e5478e4b..65e87f9821a 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -820,7 +820,7 @@ pub(crate) fn chunked_stream( mut file: File, path: PathBuf, range: Range, - chunk_size: u64, + chunk_size: usize, ) -> BoxStream<'static, Result> { futures::stream::once(async move { let (file, path) = maybe_spawn_blocking(move || { @@ -841,8 +841,14 @@ pub(crate) fn chunked_stream( return Ok(None); } - let to_read = remaining.min(chunk_size); - let mut buffer = Vec::with_capacity(to_read as usize); + let to_read = remaining.min(chunk_size as u64); + let cap = usize::try_from(to_read).map_err(|_e| Error::InvalidRange { + source: InvalidGetRange::TooLarge { + requested: to_read, + max: usize::MAX as u64, + }, + })?; + let mut buffer = Vec::with_capacity(cap); let read = (&mut file) .take(to_read) .read_to_end(&mut buffer) diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs index 040c87d3951..26beff1e1f5 100644 --- a/object_store/src/memory.rs +++ b/object_store/src/memory.rs @@ -277,8 +277,19 @@ impl ObjectStore for InMemory { let r = GetRange::Bounded(range.clone()) .as_range(entry.data.len() as u64) .map_err(|source| Error::Range { source })?; - let r = r.start as usize..r.end as usize; - Ok(entry.data.slice(r)) + let r_end = usize::try_from(r.end).map_err(|_e| Error::Range { + source: InvalidGetRange::TooLarge { + requested: r.end, + max: usize::MAX as u64, + }, + })?; + let r_start = usize::try_from(r.start).map_err(|_e| Error::Range { + source: InvalidGetRange::TooLarge { + requested: r.start, + max: usize::MAX as u64, + }, + })?; + Ok(entry.data.slice(r_start..r_end)) }) .collect() } diff --git a/object_store/src/util.rs b/object_store/src/util.rs index 6671e54c089..17a7a8cad4c 100644 --- a/object_store/src/util.rs +++ b/object_store/src/util.rs @@ -197,6 +197,9 @@ pub enum GetRange { /// an error will be returned. Additionally, if the range ends after the end /// of the object, the entire remainder of the object will be returned. /// Otherwise, the exact requested range will be returned. + /// + /// Note that range is u64 (i.e., not usize), + /// as `object_store` supports 32-bit architectures such as WASM Bounded(Range), /// Request all bytes starting from a given byte offset Offset(u64), @@ -211,19 +214,27 @@ pub(crate) enum InvalidGetRange { #[error("Range started at {start} and ended at {end}")] Inconsistent { start: u64, end: u64 }, + + #[error("Range {requested} is larger than system memory limit {max}")] + TooLarge { requested: u64, max: u64 }, } impl GetRange { pub(crate) fn is_valid(&self) -> Result<(), InvalidGetRange> { - match self { - Self::Bounded(r) if r.end <= r.start => { + if let Self::Bounded(r) = self { + if r.end <= r.start { return Err(InvalidGetRange::Inconsistent { start: r.start, end: r.end, }); } - _ => (), - }; + if (r.end - r.start) > usize::MAX as u64 { + return Err(InvalidGetRange::TooLarge { + requested: r.start, + max: usize::MAX as u64, + }); + } + } Ok(()) } @@ -333,9 +344,9 @@ mod tests { &ranges, |range| { fetches.push(range.clone()); - futures::future::ready(Ok(Bytes::from( - src[range.start as usize..range.end as usize].to_vec(), - ))) + let start = usize::try_from(range.start).unwrap(); + let end = usize::try_from(range.end).unwrap(); + futures::future::ready(Ok(Bytes::from(src[start..end].to_vec()))) }, coalesce, ) @@ -346,7 +357,7 @@ mod tests { for (range, bytes) in ranges.iter().zip(coalesced) { assert_eq!( bytes.as_ref(), - &src[range.start as usize..range.end as usize] + &src[usize::try_from(range.start).unwrap()..usize::try_from(range.end).unwrap()] ); } fetches