Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple FileFormat from datafusion_data_access #2572

Merged
merged 4 commits into from
May 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 30 additions & 55 deletions datafusion/core/src/datasource/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::sync::Arc;
use arrow::datatypes::Schema;
use arrow::{self, datatypes::SchemaRef};
use async_trait::async_trait;
use futures::StreamExt;
use datafusion_data_access::FileMeta;

use super::FileFormat;
use crate::avro_to_arrow::read_avro_schema_from_reader;
Expand All @@ -32,7 +32,7 @@ use crate::logical_plan::Expr;
use crate::physical_plan::file_format::{AvroExec, FileScanConfig};
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;
use datafusion_data_access::object_store::{ObjectReader, ObjectReaderStream};
use datafusion_data_access::object_store::ObjectStore;

/// The default file extension of avro files
pub const DEFAULT_AVRO_EXTENSION: &str = ".avro";
Expand All @@ -46,10 +46,14 @@ impl FileFormat for AvroFormat {
self
}

async fn infer_schema(&self, mut readers: ObjectReaderStream) -> Result<SchemaRef> {
async fn infer_schema(
&self,
store: &Arc<dyn ObjectStore>,
files: &[FileMeta],
) -> Result<SchemaRef> {
let mut schemas = vec![];
while let Some(obj_reader) = readers.next().await {
let mut reader = obj_reader?.sync_reader()?;
for file in files {
let mut reader = store.file_reader(file.sized_file.clone())?.sync_reader()?;
let schema = read_avro_schema_from_reader(&mut reader)?;
schemas.push(schema);
}
Expand All @@ -59,8 +63,9 @@ impl FileFormat for AvroFormat {

async fn infer_stats(
&self,
_reader: Arc<dyn ObjectReader>,
_store: &Arc<dyn ObjectStore>,
_table_schema: SchemaRef,
_file: &FileMeta,
) -> Result<Statistics> {
Ok(Statistics::default())
}
Expand All @@ -78,15 +83,9 @@ impl FileFormat for AvroFormat {
#[cfg(test)]
#[cfg(feature = "avro")]
mod tests {
use crate::{
datafusion_data_access::object_store::local::{
local_object_reader, local_object_reader_stream, LocalFileSystem,
},
physical_plan::collect,
};

use super::*;
use crate::datasource::listing::local_unpartitioned_file;
use crate::datasource::file_format::test_util::scan_format;
use crate::physical_plan::collect;
use crate::prelude::{SessionConfig, SessionContext};
use arrow::array::{
BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
Expand All @@ -100,7 +99,7 @@ mod tests {
let ctx = SessionContext::with_config(config);
let task_ctx = ctx.task_ctx();
let projection = None;
let exec = get_exec("alltypes_plain.avro", &projection, None).await?;
let exec = get_exec("alltypes_plain.avro", projection, None).await?;
let stream = exec.execute(0, task_ctx)?;

let tt_batches = stream
Expand All @@ -122,7 +121,7 @@ mod tests {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let projection = None;
let exec = get_exec("alltypes_plain.avro", &projection, Some(1)).await?;
let exec = get_exec("alltypes_plain.avro", projection, Some(1)).await?;
let batches = collect(exec, task_ctx).await?;
assert_eq!(1, batches.len());
assert_eq!(11, batches[0].num_columns());
Expand All @@ -136,7 +135,7 @@ mod tests {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let projection = None;
let exec = get_exec("alltypes_plain.avro", &projection, None).await?;
let exec = get_exec("alltypes_plain.avro", projection, None).await?;

let x: Vec<String> = exec
.schema()
Expand Down Expand Up @@ -188,7 +187,7 @@ mod tests {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let projection = Some(vec![1]);
let exec = get_exec("alltypes_plain.avro", &projection, None).await?;
let exec = get_exec("alltypes_plain.avro", projection, None).await?;

let batches = collect(exec, task_ctx).await?;
assert_eq!(batches.len(), 1);
Expand Down Expand Up @@ -218,7 +217,7 @@ mod tests {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let projection = Some(vec![0]);
let exec = get_exec("alltypes_plain.avro", &projection, None).await?;
let exec = get_exec("alltypes_plain.avro", projection, None).await?;

let batches = collect(exec, task_ctx).await?;
assert_eq!(batches.len(), 1);
Expand All @@ -245,7 +244,7 @@ mod tests {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let projection = Some(vec![10]);
let exec = get_exec("alltypes_plain.avro", &projection, None).await?;
let exec = get_exec("alltypes_plain.avro", projection, None).await?;

let batches = collect(exec, task_ctx).await?;
assert_eq!(batches.len(), 1);
Expand All @@ -272,7 +271,7 @@ mod tests {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let projection = Some(vec![6]);
let exec = get_exec("alltypes_plain.avro", &projection, None).await?;
let exec = get_exec("alltypes_plain.avro", projection, None).await?;

let batches = collect(exec, task_ctx).await?;
assert_eq!(batches.len(), 1);
Expand Down Expand Up @@ -302,7 +301,7 @@ mod tests {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let projection = Some(vec![7]);
let exec = get_exec("alltypes_plain.avro", &projection, None).await?;
let exec = get_exec("alltypes_plain.avro", projection, None).await?;

let batches = collect(exec, task_ctx).await?;
assert_eq!(batches.len(), 1);
Expand Down Expand Up @@ -332,7 +331,7 @@ mod tests {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let projection = Some(vec![9]);
let exec = get_exec("alltypes_plain.avro", &projection, None).await?;
let exec = get_exec("alltypes_plain.avro", projection, None).await?;

let batches = collect(exec, task_ctx).await?;
assert_eq!(batches.len(), 1);
Expand All @@ -359,36 +358,13 @@ mod tests {

async fn get_exec(
file_name: &str,
projection: &Option<Vec<usize>>,
projection: Option<Vec<usize>>,
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let testdata = crate::test_util::arrow_test_data();
let filename = format!("{}/avro/{}", testdata, file_name);
let store_root = format!("{}/avro", testdata);
let format = AvroFormat {};
let file_schema = format
.infer_schema(local_object_reader_stream(vec![filename.clone()]))
.await
.expect("Schema inference");
let statistics = format
.infer_stats(local_object_reader(filename.clone()), file_schema.clone())
.await
.expect("Stats inference");
let file_groups = vec![vec![local_unpartitioned_file(filename.to_owned())]];
let exec = format
.create_physical_plan(
FileScanConfig {
object_store: Arc::new(LocalFileSystem {}),
file_schema,
file_groups,
statistics,
projection: projection.clone(),
limit,
table_partition_cols: vec![],
},
&[],
)
.await?;
Ok(exec)
scan_format(&format, &store_root, file_name, projection, limit).await
}
}

Expand All @@ -397,18 +373,17 @@ mod tests {
mod tests {
use super::*;

use crate::datafusion_data_access::object_store::local::local_object_reader_stream;
use super::super::test_util::scan_format;
use crate::error::DataFusionError;

#[tokio::test]
async fn test() -> Result<()> {
let format = AvroFormat {};
let testdata = crate::test_util::arrow_test_data();
let filename = format!("{}/avro/alltypes_plain.avro", testdata);
let schema_result = AvroFormat {}
.infer_schema(local_object_reader_stream(vec![filename]))
.await;
let filename = "avro/alltypes_plain.avro";
let result = scan_format(&format, &testdata, filename, None, None).await;
assert!(matches!(
schema_result,
result,
Err(DataFusionError::NotImplemented(msg))
if msg == *"cannot read avro schema without the 'avro' feature enabled"
));
Expand Down
68 changes: 22 additions & 46 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::sync::Arc;
use arrow::datatypes::Schema;
use arrow::{self, datatypes::SchemaRef};
use async_trait::async_trait;
use futures::StreamExt;
use datafusion_data_access::FileMeta;

use super::FileFormat;
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
Expand All @@ -32,7 +32,7 @@ use crate::logical_plan::Expr;
use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;
use datafusion_data_access::object_store::{ObjectReader, ObjectReaderStream};
use datafusion_data_access::object_store::ObjectStore;

/// The default file extension of csv files
pub const DEFAULT_CSV_EXTENSION: &str = ".csv";
Expand Down Expand Up @@ -93,13 +93,17 @@ impl FileFormat for CsvFormat {
self
}

async fn infer_schema(&self, mut readers: ObjectReaderStream) -> Result<SchemaRef> {
async fn infer_schema(
&self,
store: &Arc<dyn ObjectStore>,
files: &[FileMeta],
) -> Result<SchemaRef> {
let mut schemas = vec![];

let mut records_to_read = self.schema_infer_max_rec.unwrap_or(std::usize::MAX);
let mut records_to_read = self.schema_infer_max_rec.unwrap_or(usize::MAX);

while let Some(obj_reader) = readers.next().await {
let mut reader = obj_reader?.sync_reader()?;
for file in files {
let mut reader = store.file_reader(file.sized_file.clone())?.sync_reader()?;
let (schema, records_read) = arrow::csv::reader::infer_reader_schema(
&mut reader,
self.delimiter,
Expand All @@ -122,8 +126,9 @@ impl FileFormat for CsvFormat {

async fn infer_stats(
&self,
_reader: Arc<dyn ObjectReader>,
_store: &Arc<dyn ObjectStore>,
_table_schema: SchemaRef,
_file: &FileMeta,
) -> Result<Statistics> {
Ok(Statistics::default())
}
Expand All @@ -142,24 +147,19 @@ impl FileFormat for CsvFormat {
mod tests {
use arrow::array::StringArray;

use super::super::test_util::scan_format;
use super::*;
use crate::datasource::listing::local_unpartitioned_file;
use crate::physical_plan::collect;
use crate::prelude::{SessionConfig, SessionContext};
use crate::{
datafusion_data_access::object_store::local::{
local_object_reader, local_object_reader_stream, LocalFileSystem,
},
datasource::file_format::FileScanConfig,
physical_plan::collect,
};
use futures::StreamExt;

#[tokio::test]
async fn read_small_batches() -> Result<()> {
let config = SessionConfig::new().with_batch_size(2);
let ctx = SessionContext::with_config(config);
// skip column 9 that overflows the automaticly discovered column type of i64 (u64 would work)
let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 11, 12]);
let exec = get_exec("aggregate_test_100.csv", &projection, None).await?;
let exec = get_exec("aggregate_test_100.csv", projection, None).await?;
let task_ctx = ctx.task_ctx();
let stream = exec.execute(0, task_ctx)?;

Expand All @@ -186,7 +186,7 @@ mod tests {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let projection = Some(vec![0, 1, 2, 3]);
let exec = get_exec("aggregate_test_100.csv", &projection, Some(1)).await?;
let exec = get_exec("aggregate_test_100.csv", projection, Some(1)).await?;
let batches = collect(exec, task_ctx).await?;
assert_eq!(1, batches.len());
assert_eq!(4, batches[0].num_columns());
Expand All @@ -198,7 +198,7 @@ mod tests {
#[tokio::test]
async fn infer_schema() -> Result<()> {
let projection = None;
let exec = get_exec("aggregate_test_100.csv", &projection, None).await?;
let exec = get_exec("aggregate_test_100.csv", projection, None).await?;

let x: Vec<String> = exec
.schema()
Expand Down Expand Up @@ -233,7 +233,7 @@ mod tests {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let projection = Some(vec![0]);
let exec = get_exec("aggregate_test_100.csv", &projection, None).await?;
let exec = get_exec("aggregate_test_100.csv", projection, None).await?;

let batches = collect(exec, task_ctx).await.expect("Collect batches");

Expand All @@ -258,35 +258,11 @@ mod tests {

async fn get_exec(
file_name: &str,
projection: &Option<Vec<usize>>,
projection: Option<Vec<usize>>,
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let testdata = crate::test_util::arrow_test_data();
let filename = format!("{}/csv/{}", testdata, file_name);
let root = format!("{}/csv", crate::test_util::arrow_test_data());
let format = CsvFormat::default();
let file_schema = format
.infer_schema(local_object_reader_stream(vec![filename.clone()]))
.await
.expect("Schema inference");
let statistics = format
.infer_stats(local_object_reader(filename.clone()), file_schema.clone())
.await
.expect("Stats inference");
let file_groups = vec![vec![local_unpartitioned_file(filename.to_owned())]];
let exec = format
.create_physical_plan(
FileScanConfig {
object_store: Arc::new(LocalFileSystem),
file_schema,
file_groups,
statistics,
projection: projection.clone(),
limit,
table_partition_cols: vec![],
},
&[],
)
.await?;
Ok(exec)
scan_format(&format, &root, file_name, projection, limit).await
}
}
Loading