Skip to content

Commit

Permalink
Add ObjectStore BufReader (#4762) (#4857)
Browse files Browse the repository at this point in the history
* Add ObjectStore BufReader (#4762)

* Clippy

* More Clippy

* Fix MSRV

* Fix doc
  • Loading branch information
tustvold authored Sep 25, 2023
1 parent 74e2c5c commit 2c9e2e9
Show file tree
Hide file tree
Showing 2 changed files with 294 additions and 0 deletions.
293 changes: 293 additions & 0 deletions object_store/src/buffered.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,293 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Utilities for performing tokio-style buffered IO
use crate::path::Path;
use crate::{ObjectMeta, ObjectStore};
use bytes::Bytes;
use futures::future::{BoxFuture, FutureExt};
use futures::ready;
use std::cmp::Ordering;
use std::io::{Error, ErrorKind, SeekFrom};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::io::{AsyncBufRead, AsyncRead, AsyncSeek, ReadBuf};

/// The default buffer size used by [`BufReader`]
pub const DEFAULT_BUFFER_SIZE: usize = 1024 * 1024;

/// An async-buffered reader compatible with the tokio IO traits
///
/// Internally this maintains a buffer of the requested size, and uses [`ObjectStore::get_range`]
/// to populate its internal buffer once depleted. This buffer is cleared on seek.
///
/// Whilst simple, this interface will typically be outperformed by the native [`ObjectStore`]
/// methods that better map to the network APIs. This is because most object stores have
/// very [high first-byte latencies], on the order of 100-200ms, and so avoiding unnecessary
/// round-trips is critical to throughput.
///
/// Systems looking to sequentially scan a file should instead consider using [`ObjectStore::get`],
/// or [`ObjectStore::get_opts`], or [`ObjectStore::get_range`] to read a particular range.
///
/// Systems looking to read multiple ranges of a file should instead consider using
/// [`ObjectStore::get_ranges`], which will optimise the vectored IO.
///
/// [high first-byte latencies]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/optimizing-performance.html
pub struct BufReader {
/// The object store to fetch data from
store: Arc<dyn ObjectStore>,
/// The size of the object
size: u64,
/// The path to the object
path: Path,
/// The current position in the object
cursor: u64,
/// The number of bytes to read in a single request
capacity: usize,
/// The buffered data if any
buffer: Buffer,
}

impl std::fmt::Debug for BufReader {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BufReader")
.field("path", &self.path)
.field("size", &self.size)
.field("capacity", &self.capacity)
.finish()
}
}

enum Buffer {
Empty,
Pending(BoxFuture<'static, std::io::Result<Bytes>>),
Ready(Bytes),
}

impl BufReader {
/// Create a new [`BufReader`] from the provided [`ObjectMeta`] and [`ObjectStore`]
pub fn new(store: Arc<dyn ObjectStore>, meta: &ObjectMeta) -> Self {
Self::with_capacity(store, meta, DEFAULT_BUFFER_SIZE)
}

/// Create a new [`BufReader`] from the provided [`ObjectMeta`], [`ObjectStore`], and `capacity`
pub fn with_capacity(
store: Arc<dyn ObjectStore>,
meta: &ObjectMeta,
capacity: usize,
) -> Self {
Self {
path: meta.location.clone(),
size: meta.size as _,
store,
capacity,
cursor: 0,
buffer: Buffer::Empty,
}
}

fn poll_fill_buf_impl(
&mut self,
cx: &mut Context<'_>,
amnt: usize,
) -> Poll<std::io::Result<&[u8]>> {
let buf = &mut self.buffer;
loop {
match buf {
Buffer::Empty => {
let store = Arc::clone(&self.store);
let path = self.path.clone();
let start = self.cursor.min(self.size) as _;
let end = self.cursor.saturating_add(amnt as u64).min(self.size) as _;

if start == end {
return Poll::Ready(Ok(&[]));
}

*buf = Buffer::Pending(Box::pin(async move {
Ok(store.get_range(&path, start..end).await?)
}))
}
Buffer::Pending(fut) => match ready!(fut.poll_unpin(cx)) {
Ok(b) => *buf = Buffer::Ready(b),
Err(e) => return Poll::Ready(Err(e)),
},
Buffer::Ready(r) => return Poll::Ready(Ok(r)),
}
}
}
}

impl AsyncSeek for BufReader {
fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> std::io::Result<()> {
self.cursor = match position {
SeekFrom::Start(offset) => offset,
SeekFrom::End(offset) => {
checked_add_signed(self.size,offset).ok_or_else(|| Error::new(ErrorKind::InvalidInput, format!("Seeking {offset} from end of {} byte file would result in overflow", self.size)))?
}
SeekFrom::Current(offset) => {
checked_add_signed(self.cursor, offset).ok_or_else(|| Error::new(ErrorKind::InvalidInput, format!("Seeking {offset} from current offset of {} would result in overflow", self.cursor)))?
}
};
self.buffer = Buffer::Empty;
Ok(())
}

fn poll_complete(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<std::io::Result<u64>> {
Poll::Ready(Ok(self.cursor))
}
}

impl AsyncRead for BufReader {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
out: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
// Read the maximum of the internal buffer and `out`
let to_read = out.remaining().max(self.capacity);
let r = match ready!(self.poll_fill_buf_impl(cx, to_read)) {
Ok(buf) => {
let to_consume = out.remaining().min(buf.len());
out.put_slice(&buf[..to_consume]);
self.consume(to_consume);
Ok(())
}
Err(e) => Err(e),
};
Poll::Ready(r)
}
}

impl AsyncBufRead for BufReader {
fn poll_fill_buf(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<std::io::Result<&[u8]>> {
let capacity = self.capacity;
self.get_mut().poll_fill_buf_impl(cx, capacity)
}

fn consume(mut self: Pin<&mut Self>, amt: usize) {
match &mut self.buffer {
Buffer::Empty => assert_eq!(amt, 0, "cannot consume from empty buffer"),
Buffer::Ready(b) => match b.len().cmp(&amt) {
Ordering::Less => panic!("{amt} exceeds buffer sized of {}", b.len()),
Ordering::Greater => *b = b.slice(amt..),
Ordering::Equal => self.buffer = Buffer::Empty,
},
Buffer::Pending(_) => panic!("cannot consume from pending buffer"),
}
self.cursor += amt as u64;
}
}

/// Port of standardised function as requires Rust 1.66
///
/// <https://github.com/rust-lang/rust/pull/87601/files#diff-b9390ee807a1dae3c3128dce36df56748ad8d23c6e361c0ebba4d744bf6efdb9R1533>
#[inline]
fn checked_add_signed(a: u64, rhs: i64) -> Option<u64> {
let (res, overflowed) = a.overflowing_add(rhs as _);
let overflow = overflowed ^ (rhs < 0);
(!overflow).then_some(res)
}

#[cfg(test)]
mod tests {
use super::*;
use crate::memory::InMemory;
use crate::path::Path;
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt};

#[tokio::test]
async fn test_buf_reader() {
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;

let existent = Path::from("exists.txt");
const BYTES: usize = 4096;

let data: Bytes = b"12345678".iter().cycle().copied().take(BYTES).collect();
store.put(&existent, data.clone()).await.unwrap();

let meta = store.head(&existent).await.unwrap();

let mut reader = BufReader::new(Arc::clone(&store), &meta);
let mut out = Vec::with_capacity(BYTES);
let read = reader.read_to_end(&mut out).await.unwrap();

assert_eq!(read, BYTES);
assert_eq!(&out, &data);

let err = reader.seek(SeekFrom::Current(i64::MIN)).await.unwrap_err();
assert_eq!(err.to_string(), "Seeking -9223372036854775808 from current offset of 4096 would result in overflow");

reader.rewind().await.unwrap();

let err = reader.seek(SeekFrom::Current(-1)).await.unwrap_err();
assert_eq!(
err.to_string(),
"Seeking -1 from current offset of 0 would result in overflow"
);

// Seeking beyond the bounds of the file is permitted but should return no data
reader.seek(SeekFrom::Start(u64::MAX)).await.unwrap();
let buf = reader.fill_buf().await.unwrap();
assert!(buf.is_empty());

let err = reader.seek(SeekFrom::Current(1)).await.unwrap_err();
assert_eq!(err.to_string(), "Seeking 1 from current offset of 18446744073709551615 would result in overflow");

for capacity in [200, 1024, 4096, DEFAULT_BUFFER_SIZE] {
let store = Arc::clone(&store);
let mut reader = BufReader::with_capacity(store, &meta, capacity);

let mut bytes_read = 0;
loop {
let buf = reader.fill_buf().await.unwrap();
if buf.is_empty() {
assert_eq!(bytes_read, BYTES);
break;
}
assert!(buf.starts_with(b"12345678"));
bytes_read += 8;
reader.consume(8);
}

let mut buf = Vec::with_capacity(76);
reader.seek(SeekFrom::Current(-76)).await.unwrap();
reader.read_to_end(&mut buf).await.unwrap();
assert_eq!(&buf, &data[BYTES - 76..]);

reader.rewind().await.unwrap();
let buffer = reader.fill_buf().await.unwrap();
assert_eq!(buffer, &data[..capacity.min(BYTES)]);

reader.seek(SeekFrom::Start(325)).await.unwrap();
let buffer = reader.fill_buf().await.unwrap();
assert_eq!(buffer, &data[325..(325 + capacity).min(BYTES)]);

reader.seek(SeekFrom::End(0)).await.unwrap();
let buffer = reader.fill_buf().await.unwrap();
assert!(buffer.is_empty());
}
}
}
1 change: 1 addition & 0 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ compile_error!("Features 'gcp', 'aws', 'azure', 'http' are not supported on wasm
pub mod aws;
#[cfg(feature = "azure")]
pub mod azure;
pub mod buffered;
#[cfg(not(target_arch = "wasm32"))]
pub mod chunked;
pub mod delimited;
Expand Down

0 comments on commit 2c9e2e9

Please sign in to comment.