From 79ca4a6f5c54631ff27d676c85653ae6a68aa594 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 25 Sep 2023 16:25:02 +0100 Subject: [PATCH 1/5] Add ObjectStore BufReader (#4762) --- object_store/src/buffered.rs | 263 +++++++++++++++++++++++++++++++++++ object_store/src/lib.rs | 1 + 2 files changed, 264 insertions(+) create mode 100644 object_store/src/buffered.rs diff --git a/object_store/src/buffered.rs b/object_store/src/buffered.rs new file mode 100644 index 000000000000..9c004be4bdbd --- /dev/null +++ b/object_store/src/buffered.rs @@ -0,0 +1,263 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Utilities for performing tokio-style buffered IO + +use crate::path::Path; +use crate::{ObjectMeta, ObjectStore}; +use bytes::Bytes; +use futures::future::{BoxFuture, FutureExt}; +use futures::ready; +use std::cmp::Ordering; +use std::io::{Error, ErrorKind, SeekFrom}; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use tokio::io::{AsyncBufRead, AsyncRead, AsyncSeek, ReadBuf}; + +/// The default buffer size used by [`BufReader`] +pub const DEFAULT_BUFFER_SIZE: usize = 1024 * 1024; + +/// An async-buffered reader compatible with the tokio IO traits +/// +/// Internally this maintains a buffer of the requested size, and uses [`ObjectStore::get_range`] +/// to populate its internal buffer once depleted. This buffer is cleared on seek. +/// +/// Whilst simple, this interface will typically be outperformed by the native [`ObjectStore`] +/// methods that better map to the network APIs. This is because most object stores have +/// very [high first-byte latencies], on the order of 100-200ms, and so avoiding unnecessary +/// round-trips is critical to throughput. +/// +/// Systems looking to sequentially scan a file should instead consider using [`ObjectStore::get`], +/// or [`ObjectStore::get_opts`], or [`ObjectStore::get_range`] to read a particular range. +/// +/// Systems looking to read multiple ranges of a file should instead consider using +/// [`ObjectStore::get_ranges`], which will optimise the vectored IO. +/// +/// [high first-byte latencies]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/optimizing-performance.html +pub struct BufReader { + /// The object store to fetch data from + store: Arc, + /// The size of the object + size: u64, + /// The path to the object + path: Path, + /// The current position in the object + cursor: u64, + /// The number of bytes to read in a single request + capacity: usize, + /// The buffered data if any + buffer: Buffer, +} + +impl std::fmt::Debug for BufReader { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BufReader") + .field("path", &self.path) + .field("size", &self.size) + .field("capacity", &self.capacity) + .finish() + } +} + +enum Buffer { + Empty, + Pending(BoxFuture<'static, std::io::Result>), + Ready(Bytes), +} + +impl BufReader { + /// Create a new [`BufReader`] from the provided [`ObjectMeta`] and [`ObjectStore`] + pub fn new(store: Arc, meta: &ObjectMeta) -> Self { + Self::with_capacity(store, &meta, DEFAULT_BUFFER_SIZE) + } + + /// Create a new [`BufReader`] from the provided [`ObjectMeta`], [`ObjectStore`], and `capacity` + pub fn with_capacity( + store: Arc, + meta: &ObjectMeta, + capacity: usize, + ) -> Self { + Self { + path: meta.location.clone(), + size: meta.size as _, + store, + capacity, + cursor: 0, + buffer: Buffer::Empty, + } + } + + fn poll_fill_buf_impl( + &mut self, + cx: &mut Context<'_>, + amnt: usize, + ) -> Poll> { + let buf = &mut self.buffer; + loop { + match buf { + Buffer::Empty => { + let store = self.store.clone(); + let path = self.path.clone(); + let start = self.cursor.min(self.size) as _; + let end = self.cursor.saturating_add(amnt as u64).min(self.size) as _; + + if start == end { + return Poll::Ready(Ok(&[])); + } + + *buf = Buffer::Pending(Box::pin(async move { + Ok(store.get_range(&path, start..end).await?) + })) + } + Buffer::Pending(fut) => match ready!(fut.poll_unpin(cx)) { + Ok(b) => *buf = Buffer::Ready(b), + Err(e) => return Poll::Ready(Err(e)), + }, + Buffer::Ready(r) => return Poll::Ready(Ok(r)), + } + } + } +} + +impl AsyncSeek for BufReader { + fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> std::io::Result<()> { + self.cursor = match position { + SeekFrom::Start(offset) => offset, + SeekFrom::End(offset) => { + self.size.checked_add_signed(offset).ok_or_else(|| Error::new(ErrorKind::InvalidInput, format!("Seeking {offset} from end of {} byte file would result in overflow", self.size)))? + } + SeekFrom::Current(offset) => { + self.cursor.checked_add_signed(offset).ok_or_else(|| Error::new(ErrorKind::InvalidInput, format!("Seeking {offset} from current offset of {} would result in overflow", self.cursor)))? + } + }; + self.buffer = Buffer::Empty; + Ok(()) + } + + fn poll_complete( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(self.cursor)) + } +} + +impl AsyncRead for BufReader { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + out: &mut ReadBuf<'_>, + ) -> Poll> { + // Read the maximum of the internal buffer and `out` + let to_read = out.remaining().max(self.capacity); + let r = match ready!(self.poll_fill_buf_impl(cx, to_read)) { + Ok(buf) => { + let to_consume = out.remaining().min(buf.len()); + out.put_slice(&buf[..to_consume]); + self.consume(to_consume); + Ok(()) + } + Err(e) => Err(e), + }; + Poll::Ready(r) + } +} + +impl AsyncBufRead for BufReader { + fn poll_fill_buf( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let capacity = self.capacity; + self.get_mut().poll_fill_buf_impl(cx, capacity) + } + + fn consume(mut self: Pin<&mut Self>, amt: usize) { + match &mut self.buffer { + Buffer::Empty => assert_eq!(amt, 0, "cannot consume from empty buffer"), + Buffer::Ready(b) => match b.len().cmp(&amt) { + Ordering::Less => panic!("{amt} exceeds buffer sized of {}", b.len()), + Ordering::Greater => *b = b.slice(amt..), + Ordering::Equal => self.buffer = Buffer::Empty, + }, + Buffer::Pending(_) => panic!("cannot consume from pending buffer"), + } + self.cursor = self.cursor + amt as u64; + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::memory::InMemory; + use crate::path::Path; + use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt}; + + #[tokio::test] + async fn test_buf_reader() { + let store = Arc::new(InMemory::new()); + + let existent = Path::from("exists.txt"); + const BYTES: usize = 4096; + + let data: Bytes = b"12345678".iter().cycle().copied().take(BYTES).collect(); + store.put(&existent, data.clone()).await.unwrap(); + + let meta = store.head(&existent).await.unwrap(); + + let mut reader = BufReader::new(store.clone(), &meta); + let mut out = Vec::with_capacity(BYTES); + let read = reader.read_to_end(&mut out).await.unwrap(); + + assert_eq!(read, BYTES); + assert_eq!(&out, &data); + + for capacity in [200, 1024, 4096, DEFAULT_BUFFER_SIZE] { + let mut reader = BufReader::with_capacity(store.clone(), &meta, capacity); + + let mut bytes_read = 0; + loop { + let buf = reader.fill_buf().await.unwrap(); + if buf.is_empty() { + assert_eq!(bytes_read, BYTES); + break; + } + assert!(buf.starts_with(b"12345678")); + bytes_read += 8; + reader.consume(8); + } + + let mut buf = Vec::with_capacity(76); + reader.seek(SeekFrom::Current(-76)).await.unwrap(); + reader.read_to_end(&mut buf).await.unwrap(); + assert_eq!(&buf, &data[BYTES - 76..]); + + reader.rewind().await.unwrap(); + let buffer = reader.fill_buf().await.unwrap(); + assert_eq!(buffer, &data[..capacity.min(BYTES)]); + + reader.seek(SeekFrom::Start(325)).await.unwrap(); + let buffer = reader.fill_buf().await.unwrap(); + assert_eq!(buffer, &data[325..(325 + capacity).min(BYTES)]); + + reader.seek(SeekFrom::End(0)).await.unwrap(); + let buffer = reader.fill_buf().await.unwrap(); + assert!(buffer.is_empty()); + } + } +} diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index cef10f1dd418..3fd363fd4f06 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -253,6 +253,7 @@ compile_error!("Features 'gcp', 'aws', 'azure', 'http' are not supported on wasm pub mod aws; #[cfg(feature = "azure")] pub mod azure; +pub mod buffered; #[cfg(not(target_arch = "wasm32"))] pub mod chunked; pub mod delimited; From 6751ae5948ec2f45bf6fb46953ce9771866c5291 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 25 Sep 2023 16:30:02 +0100 Subject: [PATCH 2/5] Clippy --- object_store/src/buffered.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/object_store/src/buffered.rs b/object_store/src/buffered.rs index 9c004be4bdbd..8bbffef5361e 100644 --- a/object_store/src/buffered.rs +++ b/object_store/src/buffered.rs @@ -83,7 +83,7 @@ enum Buffer { impl BufReader { /// Create a new [`BufReader`] from the provided [`ObjectMeta`] and [`ObjectStore`] pub fn new(store: Arc, meta: &ObjectMeta) -> Self { - Self::with_capacity(store, &meta, DEFAULT_BUFFER_SIZE) + Self::with_capacity(store, meta, DEFAULT_BUFFER_SIZE) } /// Create a new [`BufReader`] from the provided [`ObjectMeta`], [`ObjectStore`], and `capacity` @@ -111,7 +111,7 @@ impl BufReader { loop { match buf { Buffer::Empty => { - let store = self.store.clone(); + let store = Arc::clone(&self.store); let path = self.path.clone(); let start = self.cursor.min(self.size) as _; let end = self.cursor.saturating_add(amnt as u64).min(self.size) as _; @@ -197,7 +197,7 @@ impl AsyncBufRead for BufReader { }, Buffer::Pending(_) => panic!("cannot consume from pending buffer"), } - self.cursor = self.cursor + amt as u64; + self.cursor += amt as u64; } } From bc89dff699e5c7a422ca080f3427645457c9c17f Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 25 Sep 2023 16:35:36 +0100 Subject: [PATCH 3/5] More Clippy --- object_store/src/buffered.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/object_store/src/buffered.rs b/object_store/src/buffered.rs index 8bbffef5361e..bc94c919dcde 100644 --- a/object_store/src/buffered.rs +++ b/object_store/src/buffered.rs @@ -210,7 +210,7 @@ mod tests { #[tokio::test] async fn test_buf_reader() { - let store = Arc::new(InMemory::new()); + let store = Arc::new(InMemory::new()) as Arc; let existent = Path::from("exists.txt"); const BYTES: usize = 4096; @@ -220,7 +220,7 @@ mod tests { let meta = store.head(&existent).await.unwrap(); - let mut reader = BufReader::new(store.clone(), &meta); + let mut reader = BufReader::new(Arc::clone(&store), &meta); let mut out = Vec::with_capacity(BYTES); let read = reader.read_to_end(&mut out).await.unwrap(); @@ -228,7 +228,8 @@ mod tests { assert_eq!(&out, &data); for capacity in [200, 1024, 4096, DEFAULT_BUFFER_SIZE] { - let mut reader = BufReader::with_capacity(store.clone(), &meta, capacity); + let store = Arc::clone(&store); + let mut reader = BufReader::with_capacity(store, &meta, capacity); let mut bytes_read = 0; loop { From dd89ab8847084727d595da2f74c401fedb84b177 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 25 Sep 2023 17:06:14 +0100 Subject: [PATCH 4/5] Fix MSRV --- object_store/src/buffered.rs | 33 +++++++++++++++++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/object_store/src/buffered.rs b/object_store/src/buffered.rs index bc94c919dcde..2e5ceaf27942 100644 --- a/object_store/src/buffered.rs +++ b/object_store/src/buffered.rs @@ -139,10 +139,10 @@ impl AsyncSeek for BufReader { self.cursor = match position { SeekFrom::Start(offset) => offset, SeekFrom::End(offset) => { - self.size.checked_add_signed(offset).ok_or_else(|| Error::new(ErrorKind::InvalidInput, format!("Seeking {offset} from end of {} byte file would result in overflow", self.size)))? + checked_add_signed(self.size,offset).ok_or_else(|| Error::new(ErrorKind::InvalidInput, format!("Seeking {offset} from end of {} byte file would result in overflow", self.size)))? } SeekFrom::Current(offset) => { - self.cursor.checked_add_signed(offset).ok_or_else(|| Error::new(ErrorKind::InvalidInput, format!("Seeking {offset} from current offset of {} would result in overflow", self.cursor)))? + checked_add_signed(self.cursor, offset).ok_or_else(|| Error::new(ErrorKind::InvalidInput, format!("Seeking {offset} from current offset of {} would result in overflow", self.cursor)))? } }; self.buffer = Buffer::Empty; @@ -201,6 +201,16 @@ impl AsyncBufRead for BufReader { } } +/// Port of standardised function as requires Rust 1.66 +/// +/// https://github.com/rust-lang/rust/pull/87601/files#diff-b9390ee807a1dae3c3128dce36df56748ad8d23c6e361c0ebba4d744bf6efdb9R1533 +#[inline] +fn checked_add_signed(a: u64, rhs: i64) -> Option { + let (res, overflowed) = a.overflowing_add(rhs as _); + let overflow = overflowed ^ (rhs < 0); + (!overflow).then_some(res) +} + #[cfg(test)] mod tests { use super::*; @@ -227,6 +237,25 @@ mod tests { assert_eq!(read, BYTES); assert_eq!(&out, &data); + let err = reader.seek(SeekFrom::Current(i64::MIN)).await.unwrap_err(); + assert_eq!(err.to_string(), "Seeking -9223372036854775808 from current offset of 4096 would result in overflow"); + + reader.rewind().await.unwrap(); + + let err = reader.seek(SeekFrom::Current(-1)).await.unwrap_err(); + assert_eq!( + err.to_string(), + "Seeking -1 from current offset of 0 would result in overflow" + ); + + // Seeking beyond the bounds of the file is permitted but should return no data + reader.seek(SeekFrom::Start(u64::MAX)).await.unwrap(); + let buf = reader.fill_buf().await.unwrap(); + assert!(buf.is_empty()); + + let err = reader.seek(SeekFrom::Current(1)).await.unwrap_err(); + assert_eq!(err.to_string(), "Seeking 1 from current offset of 18446744073709551615 would result in overflow"); + for capacity in [200, 1024, 4096, DEFAULT_BUFFER_SIZE] { let store = Arc::clone(&store); let mut reader = BufReader::with_capacity(store, &meta, capacity); From 9ccd8b19ff49fdc319f6b739b7c46549250bf533 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 25 Sep 2023 17:12:33 +0100 Subject: [PATCH 5/5] Fix doc --- object_store/src/buffered.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/object_store/src/buffered.rs b/object_store/src/buffered.rs index 2e5ceaf27942..bdc3f4c772b9 100644 --- a/object_store/src/buffered.rs +++ b/object_store/src/buffered.rs @@ -203,7 +203,7 @@ impl AsyncBufRead for BufReader { /// Port of standardised function as requires Rust 1.66 /// -/// https://github.com/rust-lang/rust/pull/87601/files#diff-b9390ee807a1dae3c3128dce36df56748ad8d23c6e361c0ebba4d744bf6efdb9R1533 +/// #[inline] fn checked_add_signed(a: u64, rhs: i64) -> Option { let (res, overflowed) = a.overflowing_add(rhs as _);