Skip to content

Commit

Permalink
Implemented Reader trait that implements read_csv and will contain ot…
Browse files Browse the repository at this point in the history
…her readers.

The read_csv function can now take both strings and vector of strings. This will allow for backward compatibility while allowing for new feature.
  • Loading branch information
saikrishna1-bidgely committed Jan 21, 2023
1 parent 8b76e5f commit 654d770
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 43 deletions.
101 changes: 59 additions & 42 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -646,48 +646,6 @@ impl SessionContext {
))
}

/// Creates a [`DataFrame`] for reading a CSV data source.
pub async fn read_csv_with_multi_paths(
&self,
table_paths: &[impl AsRef<str>],
options: CsvReadOptions<'_>,
) -> Result<DataFrame> {
let table_paths = table_paths.iter()
.map(ListingTableUrl::parse)
.collect::<Result<Vec<ListingTableUrl>>>()?;
let target_partitions = self.copied_config().target_partitions();
let listing_options = options.to_listing_options(target_partitions);
let resolved_schema = match (options.schema, options.infinite) {
(Some(s), _) => Arc::new(s.to_owned()),
(None, false) => {
listing_options
.infer_schema(&self.state(), &table_paths[0])
.await?
}
(None, true) => {
return Err(DataFusionError::Plan(
"Schema inference for infinite data sources is not supported."
.to_string(),
))
}
};
let config = ListingTableConfig::new_with_multi_paths(table_paths)
.with_listing_options(listing_options)
.with_schema(resolved_schema);

let provider = ListingTable::try_new(config)?;
self.read_table(Arc::new(provider))
}

/// Creates a [`DataFrame`] for reading a CSV data source.
pub async fn read_csv(
&self,
table_path: impl AsRef<str>,
options: CsvReadOptions<'_>,
) -> Result<DataFrame> {
return self.read_csv_with_multi_paths(&[table_path], options).await;
}

/// Creates a [`DataFrame`] for reading a Parquet data source.
pub async fn read_parquet_with_multi_paths(
&self,
Expand Down Expand Up @@ -1079,6 +1037,65 @@ impl FunctionRegistry for SessionContext {
}
}

#[async_trait]
pub trait Reader<'a, T>: Sized {
async fn read_csv(&self, table_paths: T, options: CsvReadOptions<'_>) -> Result<DataFrame>
where 'a:'async_trait
;
}

#[async_trait]
impl<'a> Reader<'a, &'a str> for SessionContext {
async fn read_csv(&self, table_path: &'a str, options: CsvReadOptions<'_>) -> Result<DataFrame> {
self.read_csv(vec![table_path], options).await
}
}

#[async_trait]
impl<'a> Reader<'a, String> for SessionContext {
async fn read_csv(&self, table_path: String, options: CsvReadOptions<'_>) -> Result<DataFrame> {
self.read_csv(vec![table_path.as_str()], options).await
}
}

#[async_trait]
impl<'a> Reader<'a, Vec<&'a str>> for SessionContext {
async fn read_csv(&self, table_paths: Vec<&'a str>, options: CsvReadOptions<'_>) -> Result<DataFrame> {
let table_paths = table_paths.iter()
.map(ListingTableUrl::parse)
.collect::<Result<Vec<ListingTableUrl>>>()?;
let target_partitions = self.copied_config().target_partitions();
let listing_options = options.to_listing_options(target_partitions);
let resolved_schema = match (options.schema, options.infinite) {
(Some(s), _) => Arc::new(s.to_owned()),
(None, false) => {
listing_options
.infer_schema(&self.state(), &table_paths[0])
.await?
}
(None, true) => {
return Err(DataFusionError::Plan(
"Schema inference for infinite data sources is not supported."
.to_string(),
))
}
};
let config = ListingTableConfig::new_with_multi_paths(table_paths)
.with_listing_options(listing_options)
.with_schema(resolved_schema);

let provider = ListingTable::try_new(config)?;
self.read_table(Arc::new(provider))
}
}

#[async_trait]
impl<'a> Reader<'a, Vec<String>> for SessionContext {
async fn read_csv(&self, table_paths: Vec<String>, options: CsvReadOptions<'_>) -> Result<DataFrame> {
let table_paths = table_paths.iter().map(|s| s.as_str()).collect::<Vec<&str>>();
self.read_csv(table_paths, options).await
}
}
/// A planner used to add extensions to DataFusion logical and physical plans.
#[async_trait]
pub trait QueryPlanner {
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/physical_plan/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ pub async fn plan_to_csv(
mod tests {
use super::*;
use crate::datasource::file_format::file_type::FileType;
use crate::execution::context::Reader;
use crate::physical_plan::file_format::chunked_store::ChunkedStore;
use crate::physical_plan::file_format::partition_type_wrap;
use crate::prelude::*;
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/physical_plan/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ mod tests {
use crate::datasource::file_format::{json::JsonFormat, FileFormat};
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::execution::context::Reader;
use crate::physical_plan::file_format::chunked_store::ChunkedStore;
use crate::prelude::NdJsonReadOptions;
use crate::prelude::*;
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,7 @@ mod tests {
use crate::datasource::file_format::test_util::scan_format;
use crate::datasource::listing::{FileRange, PartitionedFile};
use crate::datasource::object_store::ObjectStoreUrl;
use crate::execution::context::Reader;
use crate::execution::options::CsvReadOptions;
use crate::physical_plan::displayable;
use crate::physical_plan::file_format::partition_type_wrap;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1827,7 +1827,7 @@ fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
mod tests {
use super::*;
use crate::datasource::MemTable;
use crate::execution::context::TaskContext;
use crate::execution::context::{TaskContext, Reader};
use crate::execution::options::CsvReadOptions;
use crate::execution::runtime_env::RuntimeEnv;
use crate::physical_plan::SendableRecordBatchStream;
Expand Down

0 comments on commit 654d770

Please sign in to comment.