forked from apache/arrow-rs
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add ObjectStore BufReader (apache#4762) (apache#4857)
* Add ObjectStore BufReader (apache#4762) * Clippy * More Clippy * Fix MSRV * Fix doc
- Loading branch information
Showing
2 changed files
with
294 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,293 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
//! Utilities for performing tokio-style buffered IO | ||
use crate::path::Path; | ||
use crate::{ObjectMeta, ObjectStore}; | ||
use bytes::Bytes; | ||
use futures::future::{BoxFuture, FutureExt}; | ||
use futures::ready; | ||
use std::cmp::Ordering; | ||
use std::io::{Error, ErrorKind, SeekFrom}; | ||
use std::pin::Pin; | ||
use std::sync::Arc; | ||
use std::task::{Context, Poll}; | ||
use tokio::io::{AsyncBufRead, AsyncRead, AsyncSeek, ReadBuf}; | ||
|
||
/// The default buffer size used by [`BufReader`] | ||
pub const DEFAULT_BUFFER_SIZE: usize = 1024 * 1024; | ||
|
||
/// An async-buffered reader compatible with the tokio IO traits | ||
/// | ||
/// Internally this maintains a buffer of the requested size, and uses [`ObjectStore::get_range`] | ||
/// to populate its internal buffer once depleted. This buffer is cleared on seek. | ||
/// | ||
/// Whilst simple, this interface will typically be outperformed by the native [`ObjectStore`] | ||
/// methods that better map to the network APIs. This is because most object stores have | ||
/// very [high first-byte latencies], on the order of 100-200ms, and so avoiding unnecessary | ||
/// round-trips is critical to throughput. | ||
/// | ||
/// Systems looking to sequentially scan a file should instead consider using [`ObjectStore::get`], | ||
/// or [`ObjectStore::get_opts`], or [`ObjectStore::get_range`] to read a particular range. | ||
/// | ||
/// Systems looking to read multiple ranges of a file should instead consider using | ||
/// [`ObjectStore::get_ranges`], which will optimise the vectored IO. | ||
/// | ||
/// [high first-byte latencies]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/optimizing-performance.html | ||
pub struct BufReader { | ||
/// The object store to fetch data from | ||
store: Arc<dyn ObjectStore>, | ||
/// The size of the object | ||
size: u64, | ||
/// The path to the object | ||
path: Path, | ||
/// The current position in the object | ||
cursor: u64, | ||
/// The number of bytes to read in a single request | ||
capacity: usize, | ||
/// The buffered data if any | ||
buffer: Buffer, | ||
} | ||
|
||
impl std::fmt::Debug for BufReader { | ||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
f.debug_struct("BufReader") | ||
.field("path", &self.path) | ||
.field("size", &self.size) | ||
.field("capacity", &self.capacity) | ||
.finish() | ||
} | ||
} | ||
|
||
enum Buffer { | ||
Empty, | ||
Pending(BoxFuture<'static, std::io::Result<Bytes>>), | ||
Ready(Bytes), | ||
} | ||
|
||
impl BufReader { | ||
/// Create a new [`BufReader`] from the provided [`ObjectMeta`] and [`ObjectStore`] | ||
pub fn new(store: Arc<dyn ObjectStore>, meta: &ObjectMeta) -> Self { | ||
Self::with_capacity(store, meta, DEFAULT_BUFFER_SIZE) | ||
} | ||
|
||
/// Create a new [`BufReader`] from the provided [`ObjectMeta`], [`ObjectStore`], and `capacity` | ||
pub fn with_capacity( | ||
store: Arc<dyn ObjectStore>, | ||
meta: &ObjectMeta, | ||
capacity: usize, | ||
) -> Self { | ||
Self { | ||
path: meta.location.clone(), | ||
size: meta.size as _, | ||
store, | ||
capacity, | ||
cursor: 0, | ||
buffer: Buffer::Empty, | ||
} | ||
} | ||
|
||
fn poll_fill_buf_impl( | ||
&mut self, | ||
cx: &mut Context<'_>, | ||
amnt: usize, | ||
) -> Poll<std::io::Result<&[u8]>> { | ||
let buf = &mut self.buffer; | ||
loop { | ||
match buf { | ||
Buffer::Empty => { | ||
let store = Arc::clone(&self.store); | ||
let path = self.path.clone(); | ||
let start = self.cursor.min(self.size) as _; | ||
let end = self.cursor.saturating_add(amnt as u64).min(self.size) as _; | ||
|
||
if start == end { | ||
return Poll::Ready(Ok(&[])); | ||
} | ||
|
||
*buf = Buffer::Pending(Box::pin(async move { | ||
Ok(store.get_range(&path, start..end).await?) | ||
})) | ||
} | ||
Buffer::Pending(fut) => match ready!(fut.poll_unpin(cx)) { | ||
Ok(b) => *buf = Buffer::Ready(b), | ||
Err(e) => return Poll::Ready(Err(e)), | ||
}, | ||
Buffer::Ready(r) => return Poll::Ready(Ok(r)), | ||
} | ||
} | ||
} | ||
} | ||
|
||
impl AsyncSeek for BufReader { | ||
fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> std::io::Result<()> { | ||
self.cursor = match position { | ||
SeekFrom::Start(offset) => offset, | ||
SeekFrom::End(offset) => { | ||
checked_add_signed(self.size,offset).ok_or_else(|| Error::new(ErrorKind::InvalidInput, format!("Seeking {offset} from end of {} byte file would result in overflow", self.size)))? | ||
} | ||
SeekFrom::Current(offset) => { | ||
checked_add_signed(self.cursor, offset).ok_or_else(|| Error::new(ErrorKind::InvalidInput, format!("Seeking {offset} from current offset of {} would result in overflow", self.cursor)))? | ||
} | ||
}; | ||
self.buffer = Buffer::Empty; | ||
Ok(()) | ||
} | ||
|
||
fn poll_complete( | ||
self: Pin<&mut Self>, | ||
_cx: &mut Context<'_>, | ||
) -> Poll<std::io::Result<u64>> { | ||
Poll::Ready(Ok(self.cursor)) | ||
} | ||
} | ||
|
||
impl AsyncRead for BufReader { | ||
fn poll_read( | ||
mut self: Pin<&mut Self>, | ||
cx: &mut Context<'_>, | ||
out: &mut ReadBuf<'_>, | ||
) -> Poll<std::io::Result<()>> { | ||
// Read the maximum of the internal buffer and `out` | ||
let to_read = out.remaining().max(self.capacity); | ||
let r = match ready!(self.poll_fill_buf_impl(cx, to_read)) { | ||
Ok(buf) => { | ||
let to_consume = out.remaining().min(buf.len()); | ||
out.put_slice(&buf[..to_consume]); | ||
self.consume(to_consume); | ||
Ok(()) | ||
} | ||
Err(e) => Err(e), | ||
}; | ||
Poll::Ready(r) | ||
} | ||
} | ||
|
||
impl AsyncBufRead for BufReader { | ||
fn poll_fill_buf( | ||
self: Pin<&mut Self>, | ||
cx: &mut Context<'_>, | ||
) -> Poll<std::io::Result<&[u8]>> { | ||
let capacity = self.capacity; | ||
self.get_mut().poll_fill_buf_impl(cx, capacity) | ||
} | ||
|
||
fn consume(mut self: Pin<&mut Self>, amt: usize) { | ||
match &mut self.buffer { | ||
Buffer::Empty => assert_eq!(amt, 0, "cannot consume from empty buffer"), | ||
Buffer::Ready(b) => match b.len().cmp(&amt) { | ||
Ordering::Less => panic!("{amt} exceeds buffer sized of {}", b.len()), | ||
Ordering::Greater => *b = b.slice(amt..), | ||
Ordering::Equal => self.buffer = Buffer::Empty, | ||
}, | ||
Buffer::Pending(_) => panic!("cannot consume from pending buffer"), | ||
} | ||
self.cursor += amt as u64; | ||
} | ||
} | ||
|
||
/// Port of standardised function as requires Rust 1.66 | ||
/// | ||
/// <https://github.com/rust-lang/rust/pull/87601/files#diff-b9390ee807a1dae3c3128dce36df56748ad8d23c6e361c0ebba4d744bf6efdb9R1533> | ||
#[inline] | ||
fn checked_add_signed(a: u64, rhs: i64) -> Option<u64> { | ||
let (res, overflowed) = a.overflowing_add(rhs as _); | ||
let overflow = overflowed ^ (rhs < 0); | ||
(!overflow).then_some(res) | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use super::*; | ||
use crate::memory::InMemory; | ||
use crate::path::Path; | ||
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt}; | ||
|
||
#[tokio::test] | ||
async fn test_buf_reader() { | ||
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>; | ||
|
||
let existent = Path::from("exists.txt"); | ||
const BYTES: usize = 4096; | ||
|
||
let data: Bytes = b"12345678".iter().cycle().copied().take(BYTES).collect(); | ||
store.put(&existent, data.clone()).await.unwrap(); | ||
|
||
let meta = store.head(&existent).await.unwrap(); | ||
|
||
let mut reader = BufReader::new(Arc::clone(&store), &meta); | ||
let mut out = Vec::with_capacity(BYTES); | ||
let read = reader.read_to_end(&mut out).await.unwrap(); | ||
|
||
assert_eq!(read, BYTES); | ||
assert_eq!(&out, &data); | ||
|
||
let err = reader.seek(SeekFrom::Current(i64::MIN)).await.unwrap_err(); | ||
assert_eq!(err.to_string(), "Seeking -9223372036854775808 from current offset of 4096 would result in overflow"); | ||
|
||
reader.rewind().await.unwrap(); | ||
|
||
let err = reader.seek(SeekFrom::Current(-1)).await.unwrap_err(); | ||
assert_eq!( | ||
err.to_string(), | ||
"Seeking -1 from current offset of 0 would result in overflow" | ||
); | ||
|
||
// Seeking beyond the bounds of the file is permitted but should return no data | ||
reader.seek(SeekFrom::Start(u64::MAX)).await.unwrap(); | ||
let buf = reader.fill_buf().await.unwrap(); | ||
assert!(buf.is_empty()); | ||
|
||
let err = reader.seek(SeekFrom::Current(1)).await.unwrap_err(); | ||
assert_eq!(err.to_string(), "Seeking 1 from current offset of 18446744073709551615 would result in overflow"); | ||
|
||
for capacity in [200, 1024, 4096, DEFAULT_BUFFER_SIZE] { | ||
let store = Arc::clone(&store); | ||
let mut reader = BufReader::with_capacity(store, &meta, capacity); | ||
|
||
let mut bytes_read = 0; | ||
loop { | ||
let buf = reader.fill_buf().await.unwrap(); | ||
if buf.is_empty() { | ||
assert_eq!(bytes_read, BYTES); | ||
break; | ||
} | ||
assert!(buf.starts_with(b"12345678")); | ||
bytes_read += 8; | ||
reader.consume(8); | ||
} | ||
|
||
let mut buf = Vec::with_capacity(76); | ||
reader.seek(SeekFrom::Current(-76)).await.unwrap(); | ||
reader.read_to_end(&mut buf).await.unwrap(); | ||
assert_eq!(&buf, &data[BYTES - 76..]); | ||
|
||
reader.rewind().await.unwrap(); | ||
let buffer = reader.fill_buf().await.unwrap(); | ||
assert_eq!(buffer, &data[..capacity.min(BYTES)]); | ||
|
||
reader.seek(SeekFrom::Start(325)).await.unwrap(); | ||
let buffer = reader.fill_buf().await.unwrap(); | ||
assert_eq!(buffer, &data[325..(325 + capacity).min(BYTES)]); | ||
|
||
reader.seek(SeekFrom::End(0)).await.unwrap(); | ||
let buffer = reader.fill_buf().await.unwrap(); | ||
assert!(buffer.is_empty()); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters