Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
XiangpengHao committed Jan 16, 2025
1 parent 5608f01 commit 077e99b
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 21 deletions.
10 changes: 5 additions & 5 deletions object_store/src/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ use crate::{PutPayload, Result};
#[derive(Debug)]
pub struct ChunkedStore {
inner: Arc<dyn ObjectStore>,
chunk_size: u64,
chunk_size: usize, // chunks are in memory, so we use usize not u64
}

impl ChunkedStore {
/// Creates a new [`ChunkedStore`] with the specified chunk_size
pub fn new(inner: Arc<dyn ObjectStore>, chunk_size: u64) -> Self {
pub fn new(inner: Arc<dyn ObjectStore>, chunk_size: usize) -> Self {
Self { inner, chunk_size }
}
}
Expand Down Expand Up @@ -100,7 +100,7 @@ impl ObjectStore for ChunkedStore {
if exhausted {
return None;
}
while buffer.len() < chunk_size as usize {
while buffer.len() < chunk_size {
match stream.next().await {
None => {
exhausted = true;
Expand All @@ -125,7 +125,7 @@ impl ObjectStore for ChunkedStore {
};
}
// Return the chunked values as the next value in the stream
let slice = buffer.split_to(chunk_size as usize).freeze();
let slice = buffer.split_to(chunk_size).freeze();
Some((Ok(slice), (stream, buffer, exhausted, chunk_size)))
},
)
Expand Down Expand Up @@ -204,7 +204,7 @@ mod tests {
let mut remaining = 1001;
while let Some(next) = s.next().await {
let size = next.unwrap().len() as u64;
let expected = remaining.min(chunk_size);
let expected = remaining.min(chunk_size as u64);
assert_eq!(size, expected);
remaining -= expected;
}
Expand Down
10 changes: 7 additions & 3 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -903,7 +903,7 @@ pub struct ObjectMeta {
pub location: Path,
/// The last modified time
pub last_modified: DateTime<Utc>,
/// The size in bytes of the object.
/// The size in bytes of the object.
///
/// Note this is not `usize` as `object_store` supports 32-bit architectures such as WASM
pub size: u64,
Expand Down Expand Up @@ -1064,7 +1064,11 @@ impl GetResult {
path: path.clone(),
})?;

let mut buffer = Vec::with_capacity(len as usize);
let mut buffer = if let Ok(len) = len.try_into() {
Vec::with_capacity(len)
} else {
Vec::new()
};
file.take(len as _)
.read_to_end(&mut buffer)
.map_err(|source| local::Error::UnableToReadBytes { source, path })?;
Expand Down Expand Up @@ -1097,7 +1101,7 @@ impl GetResult {
match self.payload {
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
GetResultPayload::File(file, path) => {
const CHUNK_SIZE: u64 = 8 * 1024;
const CHUNK_SIZE: usize = 8 * 1024;
local::chunked_stream(file, path, self.range, CHUNK_SIZE)
}
GetResultPayload::Stream(s) => s,
Expand Down
12 changes: 9 additions & 3 deletions object_store/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,7 @@ pub(crate) fn chunked_stream(
mut file: File,
path: PathBuf,
range: Range<u64>,
chunk_size: u64,
chunk_size: usize,
) -> BoxStream<'static, Result<Bytes, super::Error>> {
futures::stream::once(async move {
let (file, path) = maybe_spawn_blocking(move || {
Expand All @@ -841,8 +841,14 @@ pub(crate) fn chunked_stream(
return Ok(None);
}

let to_read = remaining.min(chunk_size);
let mut buffer = Vec::with_capacity(to_read as usize);
let to_read = remaining.min(chunk_size as u64);
let cap = usize::try_from(to_read).map_err(|_e| Error::InvalidRange {
source: InvalidGetRange::TooLarge {
requested: to_read,
max: usize::MAX as u64,
},
})?;
let mut buffer = Vec::with_capacity(cap);
let read = (&mut file)
.take(to_read)
.read_to_end(&mut buffer)
Expand Down
15 changes: 13 additions & 2 deletions object_store/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,19 @@ impl ObjectStore for InMemory {
let r = GetRange::Bounded(range.clone())
.as_range(entry.data.len() as u64)
.map_err(|source| Error::Range { source })?;
let r = r.start as usize..r.end as usize;
Ok(entry.data.slice(r))
let r_end = usize::try_from(r.end).map_err(|_e| Error::Range {
source: InvalidGetRange::TooLarge {
requested: r.end,
max: usize::MAX as u64,
},
})?;
let r_start = usize::try_from(r.start).map_err(|_e| Error::Range {
source: InvalidGetRange::TooLarge {
requested: r.start,
max: usize::MAX as u64,
},
})?;
Ok(entry.data.slice(r_start..r_end))
})
.collect()
}
Expand Down
27 changes: 19 additions & 8 deletions object_store/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ pub enum GetRange {
/// an error will be returned. Additionally, if the range ends after the end
/// of the object, the entire remainder of the object will be returned.
/// Otherwise, the exact requested range will be returned.
///
/// Note that range is u64 (i.e., not usize),
/// as `object_store` supports 32-bit architectures such as WASM
Bounded(Range<u64>),
/// Request all bytes starting from a given byte offset
Offset(u64),
Expand All @@ -211,19 +214,27 @@ pub(crate) enum InvalidGetRange {

#[error("Range started at {start} and ended at {end}")]
Inconsistent { start: u64, end: u64 },

#[error("Range {requested} is larger than system memory limit {max}")]
TooLarge { requested: u64, max: u64 },
}

impl GetRange {
pub(crate) fn is_valid(&self) -> Result<(), InvalidGetRange> {
match self {
Self::Bounded(r) if r.end <= r.start => {
if let Self::Bounded(r) = self {
if r.end <= r.start {
return Err(InvalidGetRange::Inconsistent {
start: r.start,
end: r.end,
});
}
_ => (),
};
if (r.end - r.start) > usize::MAX as u64 {
return Err(InvalidGetRange::TooLarge {
requested: r.start,
max: usize::MAX as u64,
});
}
}
Ok(())
}

Expand Down Expand Up @@ -333,9 +344,9 @@ mod tests {
&ranges,
|range| {
fetches.push(range.clone());
futures::future::ready(Ok(Bytes::from(
src[range.start as usize..range.end as usize].to_vec(),
)))
let start = usize::try_from(range.start).unwrap();
let end = usize::try_from(range.end).unwrap();
futures::future::ready(Ok(Bytes::from(src[start..end].to_vec())))
},
coalesce,
)
Expand All @@ -346,7 +357,7 @@ mod tests {
for (range, bytes) in ranges.iter().zip(coalesced) {
assert_eq!(
bytes.as_ref(),
&src[range.start as usize..range.end as usize]
&src[usize::try_from(range.start).unwrap()..usize::try_from(range.end).unwrap()]
);
}
fetches
Expand Down

0 comments on commit 077e99b

Please sign in to comment.