Skip to content

Commit

Permalink
Async ParquetExec
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Jan 19, 2022
1 parent 6f7b2d2 commit afbc9df
Show file tree
Hide file tree
Showing 6 changed files with 243 additions and 280 deletions.
6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,9 @@ members = [
[profile.release]
lto = true
codegen-units = 1

[patch.crates-io]
# TODO: TEMPORARY
arrow = { git = "https://github.com/tustvold/arrow-rs", rev = "7825ea86c425ad8f95664295a5ead576824bf832" }
arrow-flight = { git = "https://github.com/tustvold/arrow-rs", rev = "7825ea86c425ad8f95664295a5ead576824bf832" }
parquet = { git = "https://github.com/tustvold/arrow-rs", rev = "7825ea86c425ad8f95664295a5ead576824bf832" }
2 changes: 1 addition & 1 deletion datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ avro = ["avro-rs", "num-traits"]
ahash = { version = "0.7", default-features = false }
hashbrown = { version = "0.11", features = ["raw"] }
arrow = { version = "7.0.0", features = ["prettyprint"] }
parquet = { version = "7.0.0", features = ["arrow"] }
parquet = { version = "7.0.0", features = ["arrow", "async"] }
sqlparser = "0.13"
paste = "^1.0"
num_cpus = "1.13.0"
Expand Down
16 changes: 6 additions & 10 deletions datafusion/src/datasource/object_store/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ use std::io::{BufReader, Read, Seek, SeekFrom};
use std::sync::Arc;

use async_trait::async_trait;
use futures::{stream, AsyncRead, StreamExt};
use futures::{stream, StreamExt};

use crate::datasource::object_store::{
FileMeta, FileMetaStream, ListEntryStream, ObjectReader, ObjectStore,
ChunkReader, FileMeta, FileMetaStream, ListEntryStream, ObjectReader, ObjectStore,
};
use crate::datasource::PartitionedFile;
use crate::error::DataFusionError;
Expand Down Expand Up @@ -68,14 +68,10 @@ impl LocalFileReader {

#[async_trait]
impl ObjectReader for LocalFileReader {
async fn chunk_reader(
&self,
_start: u64,
_length: usize,
) -> Result<Box<dyn AsyncRead>> {
todo!(
"implement once async file readers are available (arrow-rs#78, arrow-rs#111)"
)
async fn chunk_reader(&self) -> Result<Box<dyn ChunkReader>> {
let file = tokio::fs::File::open(&self.file.path).await?;
let file = tokio::io::BufReader::new(file);
Ok(Box::new(file))
}

fn sync_chunk_reader(
Expand Down
29 changes: 25 additions & 4 deletions datafusion/src/datasource/object_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,42 @@ use std::sync::{Arc, RwLock};

use async_trait::async_trait;
use chrono::{DateTime, Utc};
use futures::{AsyncRead, Stream, StreamExt};
use futures::{Stream, StreamExt};
use tokio::io::{AsyncBufRead, AsyncSeek};

use local::LocalFileSystem;

use crate::error::{DataFusionError, Result};

/// Provides async access to read a file, combing [`AsyncSeek`]
/// and [`AsyncBufRead`] so they can be used as a trait object
///
/// [`AsyncSeek`] is necessary because readers may need to seek around whilst
/// reading, either because the format itself is structured (e.g. parquet)
/// or because it needs to read metadata or infer schema as an initial step
///
/// [`AsyncBufRead`] is necessary because readers may wish to read data
/// up until some delimiter (e.g. csv or newline-delimited JSON)
///
/// Note: the same block of data may be read multiple times
///
/// Implementations that fetch from object storage may wish to maintain an internal
/// buffer of fetched data blocks, potentially discarding them or spilling them to disk
/// based on memory pressure
///
/// TODO(#1614): Remove Sync
pub trait ChunkReader: AsyncBufRead + AsyncSeek + Send + Sync + Unpin {}
impl<T: AsyncBufRead + AsyncSeek + Send + Sync + Unpin> ChunkReader for T {}

/// Object Reader for one file in an object store.
///
/// Note that the dynamic dispatch on the reader might
/// have some performance impacts.
#[async_trait]
pub trait ObjectReader: Send + Sync {
/// Get reader for a part [start, start + length] in the file asynchronously
async fn chunk_reader(&self, start: u64, length: usize)
-> Result<Box<dyn AsyncRead>>;
/// Get a [`ChunkReader`] for the file, successive calls to this MUST
/// return readers with independent seek positions
async fn chunk_reader(&self) -> Result<Box<dyn ChunkReader>>;

/// Get reader for a part [start, start + length] in the file
fn sync_chunk_reader(
Expand Down
Loading

0 comments on commit afbc9df

Please sign in to comment.