Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ObjectStore BufReader (#4762) #4857

Merged
merged 5 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
293 changes: 293 additions & 0 deletions object_store/src/buffered.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,293 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Utilities for performing tokio-style buffered IO

use crate::path::Path;
use crate::{ObjectMeta, ObjectStore};
use bytes::Bytes;
use futures::future::{BoxFuture, FutureExt};
use futures::ready;
use std::cmp::Ordering;
use std::io::{Error, ErrorKind, SeekFrom};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::io::{AsyncBufRead, AsyncRead, AsyncSeek, ReadBuf};

/// The default buffer size used by [`BufReader`]
pub const DEFAULT_BUFFER_SIZE: usize = 1024 * 1024;

/// An async-buffered reader compatible with the tokio IO traits
///
/// Internally this maintains a buffer of the requested size, and uses [`ObjectStore::get_range`]
/// to populate its internal buffer once depleted. This buffer is cleared on seek.
///
/// Whilst simple, this interface will typically be outperformed by the native [`ObjectStore`]
/// methods that better map to the network APIs. This is because most object stores have
/// very [high first-byte latencies], on the order of 100-200ms, and so avoiding unnecessary
/// round-trips is critical to throughput.
///
/// Systems looking to sequentially scan a file should instead consider using [`ObjectStore::get`],
/// or [`ObjectStore::get_opts`], or [`ObjectStore::get_range`] to read a particular range.
///
/// Systems looking to read multiple ranges of a file should instead consider using
/// [`ObjectStore::get_ranges`], which will optimise the vectored IO.
///
/// [high first-byte latencies]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/optimizing-performance.html
pub struct BufReader {
/// The object store to fetch data from
store: Arc<dyn ObjectStore>,
/// The size of the object
size: u64,
/// The path to the object
path: Path,
/// The current position in the object
cursor: u64,
/// The number of bytes to read in a single request
capacity: usize,
/// The buffered data if any
buffer: Buffer,
}

impl std::fmt::Debug for BufReader {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BufReader")
.field("path", &self.path)
.field("size", &self.size)
.field("capacity", &self.capacity)
.finish()
}
}

enum Buffer {
Empty,
Pending(BoxFuture<'static, std::io::Result<Bytes>>),
Ready(Bytes),
}

impl BufReader {
/// Create a new [`BufReader`] from the provided [`ObjectMeta`] and [`ObjectStore`]
pub fn new(store: Arc<dyn ObjectStore>, meta: &ObjectMeta) -> Self {
Self::with_capacity(store, meta, DEFAULT_BUFFER_SIZE)
}

/// Create a new [`BufReader`] from the provided [`ObjectMeta`], [`ObjectStore`], and `capacity`
pub fn with_capacity(
store: Arc<dyn ObjectStore>,
meta: &ObjectMeta,
capacity: usize,
) -> Self {
Self {
path: meta.location.clone(),
size: meta.size as _,
store,
capacity,
cursor: 0,
buffer: Buffer::Empty,
}
}

fn poll_fill_buf_impl(
&mut self,
cx: &mut Context<'_>,
amnt: usize,
) -> Poll<std::io::Result<&[u8]>> {
let buf = &mut self.buffer;
loop {
match buf {
Buffer::Empty => {
let store = Arc::clone(&self.store);
let path = self.path.clone();
let start = self.cursor.min(self.size) as _;
let end = self.cursor.saturating_add(amnt as u64).min(self.size) as _;

if start == end {
return Poll::Ready(Ok(&[]));
}

*buf = Buffer::Pending(Box::pin(async move {
Ok(store.get_range(&path, start..end).await?)
}))
}
Buffer::Pending(fut) => match ready!(fut.poll_unpin(cx)) {
Ok(b) => *buf = Buffer::Ready(b),
Err(e) => return Poll::Ready(Err(e)),
},
Buffer::Ready(r) => return Poll::Ready(Ok(r)),
}
}
}
}

impl AsyncSeek for BufReader {
fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> std::io::Result<()> {
self.cursor = match position {
SeekFrom::Start(offset) => offset,
SeekFrom::End(offset) => {
checked_add_signed(self.size,offset).ok_or_else(|| Error::new(ErrorKind::InvalidInput, format!("Seeking {offset} from end of {} byte file would result in overflow", self.size)))?
}
SeekFrom::Current(offset) => {
checked_add_signed(self.cursor, offset).ok_or_else(|| Error::new(ErrorKind::InvalidInput, format!("Seeking {offset} from current offset of {} would result in overflow", self.cursor)))?
}
};
self.buffer = Buffer::Empty;
Ok(())
}

fn poll_complete(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<std::io::Result<u64>> {
Poll::Ready(Ok(self.cursor))
}
}

impl AsyncRead for BufReader {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
out: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
// Read the maximum of the internal buffer and `out`
let to_read = out.remaining().max(self.capacity);
let r = match ready!(self.poll_fill_buf_impl(cx, to_read)) {
Ok(buf) => {
let to_consume = out.remaining().min(buf.len());
out.put_slice(&buf[..to_consume]);
self.consume(to_consume);
Ok(())
}
Err(e) => Err(e),
};
Poll::Ready(r)
}
}

impl AsyncBufRead for BufReader {
fn poll_fill_buf(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<std::io::Result<&[u8]>> {
let capacity = self.capacity;
self.get_mut().poll_fill_buf_impl(cx, capacity)
}

fn consume(mut self: Pin<&mut Self>, amt: usize) {
match &mut self.buffer {
Buffer::Empty => assert_eq!(amt, 0, "cannot consume from empty buffer"),
Buffer::Ready(b) => match b.len().cmp(&amt) {
Ordering::Less => panic!("{amt} exceeds buffer sized of {}", b.len()),
Ordering::Greater => *b = b.slice(amt..),
Ordering::Equal => self.buffer = Buffer::Empty,
},
Buffer::Pending(_) => panic!("cannot consume from pending buffer"),
}
self.cursor += amt as u64;
}
}

/// Port of standardised function as requires Rust 1.66
///
/// <https://github.com/rust-lang/rust/pull/87601/files#diff-b9390ee807a1dae3c3128dce36df56748ad8d23c6e361c0ebba4d744bf6efdb9R1533>
#[inline]
fn checked_add_signed(a: u64, rhs: i64) -> Option<u64> {
let (res, overflowed) = a.overflowing_add(rhs as _);
let overflow = overflowed ^ (rhs < 0);
(!overflow).then_some(res)
}

#[cfg(test)]
mod tests {
use super::*;
use crate::memory::InMemory;
use crate::path::Path;
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt};

#[tokio::test]
async fn test_buf_reader() {
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;

let existent = Path::from("exists.txt");
const BYTES: usize = 4096;

let data: Bytes = b"12345678".iter().cycle().copied().take(BYTES).collect();
store.put(&existent, data.clone()).await.unwrap();

let meta = store.head(&existent).await.unwrap();

let mut reader = BufReader::new(Arc::clone(&store), &meta);
let mut out = Vec::with_capacity(BYTES);
let read = reader.read_to_end(&mut out).await.unwrap();

assert_eq!(read, BYTES);
assert_eq!(&out, &data);

let err = reader.seek(SeekFrom::Current(i64::MIN)).await.unwrap_err();
assert_eq!(err.to_string(), "Seeking -9223372036854775808 from current offset of 4096 would result in overflow");

reader.rewind().await.unwrap();

let err = reader.seek(SeekFrom::Current(-1)).await.unwrap_err();
assert_eq!(
err.to_string(),
"Seeking -1 from current offset of 0 would result in overflow"
);

// Seeking beyond the bounds of the file is permitted but should return no data
reader.seek(SeekFrom::Start(u64::MAX)).await.unwrap();
let buf = reader.fill_buf().await.unwrap();
assert!(buf.is_empty());

let err = reader.seek(SeekFrom::Current(1)).await.unwrap_err();
assert_eq!(err.to_string(), "Seeking 1 from current offset of 18446744073709551615 would result in overflow");

for capacity in [200, 1024, 4096, DEFAULT_BUFFER_SIZE] {
let store = Arc::clone(&store);
let mut reader = BufReader::with_capacity(store, &meta, capacity);

let mut bytes_read = 0;
loop {
let buf = reader.fill_buf().await.unwrap();
if buf.is_empty() {
assert_eq!(bytes_read, BYTES);
break;
}
assert!(buf.starts_with(b"12345678"));
bytes_read += 8;
reader.consume(8);
}

let mut buf = Vec::with_capacity(76);
reader.seek(SeekFrom::Current(-76)).await.unwrap();
reader.read_to_end(&mut buf).await.unwrap();
assert_eq!(&buf, &data[BYTES - 76..]);

reader.rewind().await.unwrap();
let buffer = reader.fill_buf().await.unwrap();
assert_eq!(buffer, &data[..capacity.min(BYTES)]);

reader.seek(SeekFrom::Start(325)).await.unwrap();
let buffer = reader.fill_buf().await.unwrap();
assert_eq!(buffer, &data[325..(325 + capacity).min(BYTES)]);

reader.seek(SeekFrom::End(0)).await.unwrap();
let buffer = reader.fill_buf().await.unwrap();
assert!(buffer.is_empty());
}
}
}
1 change: 1 addition & 0 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ compile_error!("Features 'gcp', 'aws', 'azure', 'http' are not supported on wasm
pub mod aws;
#[cfg(feature = "azure")]
pub mod azure;
pub mod buffered;
#[cfg(not(target_arch = "wasm32"))]
pub mod chunked;
pub mod delimited;
Expand Down