Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve scan perf by re-enable prefetching in ScanNode #473

Merged
merged 5 commits into from
Jan 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions rust/benches/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,17 @@
// specific language governing permissions and limitations
// under the License.

//! Before running the dataset, prepare a "test.lance" dataset, in the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how to prepare though? python? rust script?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should use either python/rust to generate the dataset. It is also a good test for guaranteeing compatibility between implementations.

//! `lance/rust` directory. There is no limitation in the dataset size,
//! schema, or content.
//!
//! Run benchmark.
//! ```
//! cargo bench --bench scan
//! ```.
//!
//! TODO: Take parameterized input to specify dataset URI from command line.

use criterion::{criterion_group, criterion_main, Criterion};
use futures::stream::TryStreamExt;
use pprof::criterion::{Output, PProfProfiler};
Expand All @@ -27,11 +38,13 @@ fn bench_scan(c: &mut Criterion) {

let dataset = rt.block_on(async { Dataset::open("./test.lance").await.unwrap() });

c.bench_function("Scan datasets", |b| {
c.bench_function("Scan full dataset", |b| {
b.to_async(&rt).iter(|| async {
let count = dataset
.scan()
.into_stream()
.try_into_stream()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
Expand Down
16 changes: 14 additions & 2 deletions rust/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,9 +454,12 @@ mod tests {
use crate::utils::testing::generate_random_array;

use arrow_array::{
DictionaryArray, FixedSizeListArray, Int32Array, RecordBatch, StringArray, UInt16Array,
cast::as_struct_array, DictionaryArray, FixedSizeListArray, Int32Array, RecordBatch,
StringArray, UInt16Array,
};
use arrow_ord::sort::sort_to_indices;
use arrow_schema::{DataType, Field, Schema};
use arrow_select::take::take;
use futures::stream::TryStreamExt;

use tempfile::tempdir;
Expand Down Expand Up @@ -519,7 +522,16 @@ mod tests {
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(expected_batches, actual_batches);
// sort
let actual_batch = concat_batches(&schema, &actual_batches).unwrap();
let idx_arr = actual_batch.column_by_name("i").unwrap();
let sorted_indices = sort_to_indices(idx_arr, None, None).unwrap();
let struct_arr: StructArray = actual_batch.into();
let sorted_arr = take(&struct_arr, &sorted_indices, None).unwrap();

let expected_struct_arr: StructArray =
concat_batches(&schema, &expected_batches).unwrap().into();
assert_eq!(&expected_struct_arr, as_struct_array(sorted_arr.as_ref()));

// Each fragments has different fragment ID
assert_eq!(
Expand Down
12 changes: 8 additions & 4 deletions rust/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,20 @@ pub enum Error {
Schema(String),
IO(String),
Index(String),
/// Stream early stop
Stop(),
}

pub type Result<T> = std::result::Result<T, Error>;

impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let (catalog, message) = match self {
Self::Arrow(s) => ("Arrow", s),
Self::Schema(s) => ("Schema", s),
Self::IO(s) => ("I/O", s),
Self::Index(s) => ("Index", s),
Self::Arrow(s) => ("Arrow", s.as_str()),
Self::Schema(s) => ("Schema", s.as_str()),
Self::IO(s) => ("I/O", s.as_str()),
Self::Index(s) => ("Index", s.as_str()),
Self::Stop() => ("Early stop", ""),
};
write!(f, "LanceError({catalog}): {message}")
}
Expand Down Expand Up @@ -90,6 +93,7 @@ impl From<Error> for ArrowError {
Error::IO(err) => Self::IoError(err),
Error::Schema(err) => Self::SchemaError(err),
Error::Index(err) => Self::IoError(err),
Error::Stop() => Self::IoError("early stop".to_string()),
}
}
}
21 changes: 13 additions & 8 deletions rust/src/io/exec/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::sync::Arc;
use std::task::{Context, Poll};

use arrow_array::RecordBatch;
use futures::stream::Stream;
use futures::stream::{self, Stream, StreamExt, TryStreamExt};
use object_store::path::Path;
use tokio::sync::mpsc::{self, Receiver};
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -83,17 +83,22 @@ impl Scan {
break;
}
};
for batch_id in 0..reader.num_batches() {
let batch = reader.read_batch(batch_id as i32, ..).await;
if tx.is_closed() {
break;
}
if tx.send(batch).await.is_err() {
// tx closed earlier.

let r = &reader;
match stream::iter(0..reader.num_batches())
.map(|batch_id| async move { r.read_batch(batch_id as i32, ..).await })
.buffer_unordered(prefetch_size)
.try_for_each(|b| async { tx.send(Ok(b)).await.map_err(|_| Error::Stop()) })
.await
{
Ok(_) | Err(Error::Stop()) => {}
Err(e) => {
eprintln!("Failed to scan data: {e}");
break;
}
}
}

drop(tx)
});

Expand Down